Merge pull request #1362 from obscuren/txpool-cleanup
core: reduce CPU load by reducing calls to checkQueue
This commit is contained in:
		@@ -29,7 +29,7 @@ var (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	maxQueued = 200 // max limit of queued txs per address
 | 
						maxQueued = 64 // max limit of queued txs per address
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type stateFn func() *state.StateDB
 | 
					type stateFn func() *state.StateDB
 | 
				
			||||||
@@ -129,6 +129,17 @@ func (pool *TxPool) State() *state.ManagedState {
 | 
				
			|||||||
	return pool.pendingState
 | 
						return pool.pendingState
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pool *TxPool) Stats() (pending int, queued int) {
 | 
				
			||||||
 | 
						pool.mu.RLock()
 | 
				
			||||||
 | 
						defer pool.mu.RUnlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pending = len(pool.pending)
 | 
				
			||||||
 | 
						for _, txs := range pool.queue {
 | 
				
			||||||
 | 
							queued += len(txs)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// validateTx checks whether a transaction is valid according
 | 
					// validateTx checks whether a transaction is valid according
 | 
				
			||||||
// to the consensus rules.
 | 
					// to the consensus rules.
 | 
				
			||||||
func (pool *TxPool) validateTx(tx *types.Transaction) error {
 | 
					func (pool *TxPool) validateTx(tx *types.Transaction) error {
 | 
				
			||||||
@@ -214,9 +225,6 @@ func (self *TxPool) add(tx *types.Transaction) error {
 | 
				
			|||||||
		glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
 | 
							glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// check and validate the queueue
 | 
					 | 
				
			||||||
	self.checkQueue()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -245,11 +253,17 @@ func (pool *TxPool) addTx(hash common.Hash, addr common.Address, tx *types.Trans
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Add queues a single transaction in the pool if it is valid.
 | 
					// Add queues a single transaction in the pool if it is valid.
 | 
				
			||||||
func (self *TxPool) Add(tx *types.Transaction) error {
 | 
					func (self *TxPool) Add(tx *types.Transaction) (err error) {
 | 
				
			||||||
	self.mu.Lock()
 | 
						self.mu.Lock()
 | 
				
			||||||
	defer self.mu.Unlock()
 | 
						defer self.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return self.add(tx)
 | 
						err = self.add(tx)
 | 
				
			||||||
 | 
						if err == nil {
 | 
				
			||||||
 | 
							// check and validate the queueue
 | 
				
			||||||
 | 
							self.checkQueue()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// AddTransactions attempts to queue all valid transactions in txs.
 | 
					// AddTransactions attempts to queue all valid transactions in txs.
 | 
				
			||||||
@@ -265,6 +279,9 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
 | 
				
			|||||||
			glog.V(logger.Debug).Infof("tx %x\n", h[:4])
 | 
								glog.V(logger.Debug).Infof("tx %x\n", h[:4])
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// check and validate the queueue
 | 
				
			||||||
 | 
						self.checkQueue()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GetTransaction returns a transaction if it is contained in the pool
 | 
					// GetTransaction returns a transaction if it is contained in the pool
 | 
				
			||||||
@@ -327,6 +344,23 @@ func (self *TxPool) RemoveTransactions(txs types.Transactions) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (pool *TxPool) removeTx(hash common.Hash) {
 | 
				
			||||||
 | 
						// delete from pending pool
 | 
				
			||||||
 | 
						delete(pool.pending, hash)
 | 
				
			||||||
 | 
						// delete from queue
 | 
				
			||||||
 | 
						for address, txs := range pool.queue {
 | 
				
			||||||
 | 
							if _, ok := txs[hash]; ok {
 | 
				
			||||||
 | 
								if len(txs) == 1 {
 | 
				
			||||||
 | 
									// if only one tx, remove entire address entry.
 | 
				
			||||||
 | 
									delete(pool.queue, address)
 | 
				
			||||||
 | 
								} else {
 | 
				
			||||||
 | 
									delete(txs, hash)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// checkQueue moves transactions that have become processable to main pool.
 | 
					// checkQueue moves transactions that have become processable to main pool.
 | 
				
			||||||
func (pool *TxPool) checkQueue() {
 | 
					func (pool *TxPool) checkQueue() {
 | 
				
			||||||
	state := pool.pendingState
 | 
						state := pool.pendingState
 | 
				
			||||||
@@ -354,13 +388,19 @@ func (pool *TxPool) checkQueue() {
 | 
				
			|||||||
		for i, e := range addq {
 | 
							for i, e := range addq {
 | 
				
			||||||
			// start deleting the transactions from the queue if they exceed the limit
 | 
								// start deleting the transactions from the queue if they exceed the limit
 | 
				
			||||||
			if i > maxQueued {
 | 
								if i > maxQueued {
 | 
				
			||||||
				if glog.V(logger.Debug) {
 | 
					 | 
				
			||||||
					glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				delete(pool.queue[address], e.hash)
 | 
									delete(pool.queue[address], e.hash)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			if e.Nonce() > guessedNonce {
 | 
								if e.Nonce() > guessedNonce {
 | 
				
			||||||
 | 
									if len(addq)-i > maxQueued {
 | 
				
			||||||
 | 
										if glog.V(logger.Debug) {
 | 
				
			||||||
 | 
											glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(e.hash[:]))
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										for j := i + maxQueued; j < len(addq); j++ {
 | 
				
			||||||
 | 
											delete(txs, addq[j].hash)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
				break
 | 
									break
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			delete(txs, e.hash)
 | 
								delete(txs, e.hash)
 | 
				
			||||||
@@ -373,23 +413,6 @@ func (pool *TxPool) checkQueue() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (pool *TxPool) removeTx(hash common.Hash) {
 | 
					 | 
				
			||||||
	// delete from pending pool
 | 
					 | 
				
			||||||
	delete(pool.pending, hash)
 | 
					 | 
				
			||||||
	// delete from queue
 | 
					 | 
				
			||||||
	for address, txs := range pool.queue {
 | 
					 | 
				
			||||||
		if _, ok := txs[hash]; ok {
 | 
					 | 
				
			||||||
			if len(txs) == 1 {
 | 
					 | 
				
			||||||
				// if only one tx, remove entire address entry.
 | 
					 | 
				
			||||||
				delete(pool.queue, address)
 | 
					 | 
				
			||||||
			} else {
 | 
					 | 
				
			||||||
				delete(txs, hash)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// validatePool removes invalid and processed transactions from the main pool.
 | 
					// validatePool removes invalid and processed transactions from the main pool.
 | 
				
			||||||
func (pool *TxPool) validatePool() {
 | 
					func (pool *TxPool) validatePool() {
 | 
				
			||||||
	state := pool.currentState()
 | 
						state := pool.currentState()
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -181,6 +181,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
 | 
				
			|||||||
	if err := pool.add(tx2); err != nil {
 | 
						if err := pool.add(tx2); err != nil {
 | 
				
			||||||
		t.Error("didn't expect error", err)
 | 
							t.Error("didn't expect error", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pool.checkQueue()
 | 
				
			||||||
	if len(pool.pending) != 2 {
 | 
						if len(pool.pending) != 2 {
 | 
				
			||||||
		t.Error("expected 2 pending txs. Got", len(pool.pending))
 | 
							t.Error("expected 2 pending txs. Got", len(pool.pending))
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -68,8 +68,9 @@ func (self *txPoolApi) ApiVersion() string {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (self *txPoolApi) Status(req *shared.Request) (interface{}, error) {
 | 
					func (self *txPoolApi) Status(req *shared.Request) (interface{}, error) {
 | 
				
			||||||
 | 
						pending, queue := self.ethereum.TxPool().Stats()
 | 
				
			||||||
	return map[string]int{
 | 
						return map[string]int{
 | 
				
			||||||
		"pending": self.ethereum.TxPool().GetTransactions().Len(),
 | 
							"pending": pending,
 | 
				
			||||||
		"queued":  self.ethereum.TxPool().GetQueuedTransactions().Len(),
 | 
							"queued":  queue,
 | 
				
			||||||
	}, nil
 | 
						}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user