Merge pull request #16494 from karalabe/txpool-stable-pricedelete
core: txpool stable underprice drop order, perf fixes
This commit is contained in:
		@@ -367,9 +367,20 @@ func (l *txList) Flatten() types.Transactions {
 | 
				
			|||||||
// price-sorted transactions to discard when the pool fills up.
 | 
					// price-sorted transactions to discard when the pool fills up.
 | 
				
			||||||
type priceHeap []*types.Transaction
 | 
					type priceHeap []*types.Transaction
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h priceHeap) Len() int           { return len(h) }
 | 
					func (h priceHeap) Len() int      { return len(h) }
 | 
				
			||||||
func (h priceHeap) Less(i, j int) bool { return h[i].GasPrice().Cmp(h[j].GasPrice()) < 0 }
 | 
					func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
 | 
				
			||||||
func (h priceHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
 | 
					
 | 
				
			||||||
 | 
					func (h priceHeap) Less(i, j int) bool {
 | 
				
			||||||
 | 
						// Sort primarily by price, returning the cheaper one
 | 
				
			||||||
 | 
						switch h[i].GasPrice().Cmp(h[j].GasPrice()) {
 | 
				
			||||||
 | 
						case -1:
 | 
				
			||||||
 | 
							return true
 | 
				
			||||||
 | 
						case 1:
 | 
				
			||||||
 | 
							return false
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// If the prices match, stabilize via nonces (high nonce is worse)
 | 
				
			||||||
 | 
						return h[i].Nonce() > h[j].Nonce()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (h *priceHeap) Push(x interface{}) {
 | 
					func (h *priceHeap) Push(x interface{}) {
 | 
				
			||||||
	*h = append(*h, x.(*types.Transaction))
 | 
						*h = append(*h, x.(*types.Transaction))
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -320,7 +320,7 @@ func (pool *TxPool) loop() {
 | 
				
			|||||||
				// Any non-locals old enough should be removed
 | 
									// Any non-locals old enough should be removed
 | 
				
			||||||
				if time.Since(pool.beats[addr]) > pool.config.Lifetime {
 | 
									if time.Since(pool.beats[addr]) > pool.config.Lifetime {
 | 
				
			||||||
					for _, tx := range pool.queue[addr].Flatten() {
 | 
										for _, tx := range pool.queue[addr].Flatten() {
 | 
				
			||||||
						pool.removeTx(tx.Hash())
 | 
											pool.removeTx(tx.Hash(), true)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -468,7 +468,7 @@ func (pool *TxPool) SetGasPrice(price *big.Int) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	pool.gasPrice = price
 | 
						pool.gasPrice = price
 | 
				
			||||||
	for _, tx := range pool.priced.Cap(price, pool.locals) {
 | 
						for _, tx := range pool.priced.Cap(price, pool.locals) {
 | 
				
			||||||
		pool.removeTx(tx.Hash())
 | 
							pool.removeTx(tx.Hash(), false)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	log.Info("Transaction pool price threshold updated", "price", price)
 | 
						log.Info("Transaction pool price threshold updated", "price", price)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -630,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
 | 
				
			|||||||
		for _, tx := range drop {
 | 
							for _, tx := range drop {
 | 
				
			||||||
			log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
 | 
								log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
 | 
				
			||||||
			underpricedTxCounter.Inc(1)
 | 
								underpricedTxCounter.Inc(1)
 | 
				
			||||||
			pool.removeTx(tx.Hash())
 | 
								pool.removeTx(tx.Hash(), false)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// If the transaction is replacing an already pending one, do directly
 | 
						// If the transaction is replacing an already pending one, do directly
 | 
				
			||||||
@@ -695,8 +695,10 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
 | 
				
			|||||||
		pool.priced.Removed()
 | 
							pool.priced.Removed()
 | 
				
			||||||
		queuedReplaceCounter.Inc(1)
 | 
							queuedReplaceCounter.Inc(1)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pool.all[hash] = tx
 | 
						if pool.all[hash] == nil {
 | 
				
			||||||
	pool.priced.Put(tx)
 | 
							pool.all[hash] = tx
 | 
				
			||||||
 | 
							pool.priced.Put(tx)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	return old != nil, nil
 | 
						return old != nil, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -862,7 +864,7 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// removeTx removes a single transaction from the queue, moving all subsequent
 | 
					// removeTx removes a single transaction from the queue, moving all subsequent
 | 
				
			||||||
// transactions back to the future queue.
 | 
					// transactions back to the future queue.
 | 
				
			||||||
func (pool *TxPool) removeTx(hash common.Hash) {
 | 
					func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
 | 
				
			||||||
	// Fetch the transaction we wish to delete
 | 
						// Fetch the transaction we wish to delete
 | 
				
			||||||
	tx, ok := pool.all[hash]
 | 
						tx, ok := pool.all[hash]
 | 
				
			||||||
	if !ok {
 | 
						if !ok {
 | 
				
			||||||
@@ -872,8 +874,9 @@ func (pool *TxPool) removeTx(hash common.Hash) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// Remove it from the list of known transactions
 | 
						// Remove it from the list of known transactions
 | 
				
			||||||
	delete(pool.all, hash)
 | 
						delete(pool.all, hash)
 | 
				
			||||||
	pool.priced.Removed()
 | 
						if outofbound {
 | 
				
			||||||
 | 
							pool.priced.Removed()
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	// Remove the transaction from the pending lists and reset the account nonce
 | 
						// Remove the transaction from the pending lists and reset the account nonce
 | 
				
			||||||
	if pending := pool.pending[addr]; pending != nil {
 | 
						if pending := pool.pending[addr]; pending != nil {
 | 
				
			||||||
		if removed, invalids := pending.Remove(tx); removed {
 | 
							if removed, invalids := pending.Remove(tx); removed {
 | 
				
			||||||
@@ -1052,7 +1055,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
 | 
				
			|||||||
			// Drop all transactions if they are less than the overflow
 | 
								// Drop all transactions if they are less than the overflow
 | 
				
			||||||
			if size := uint64(list.Len()); size <= drop {
 | 
								if size := uint64(list.Len()); size <= drop {
 | 
				
			||||||
				for _, tx := range list.Flatten() {
 | 
									for _, tx := range list.Flatten() {
 | 
				
			||||||
					pool.removeTx(tx.Hash())
 | 
										pool.removeTx(tx.Hash(), true)
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				drop -= size
 | 
									drop -= size
 | 
				
			||||||
				queuedRateLimitCounter.Inc(int64(size))
 | 
									queuedRateLimitCounter.Inc(int64(size))
 | 
				
			||||||
@@ -1061,7 +1064,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
 | 
				
			|||||||
			// Otherwise drop only last few transactions
 | 
								// Otherwise drop only last few transactions
 | 
				
			||||||
			txs := list.Flatten()
 | 
								txs := list.Flatten()
 | 
				
			||||||
			for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
 | 
								for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
 | 
				
			||||||
				pool.removeTx(txs[i].Hash())
 | 
									pool.removeTx(txs[i].Hash(), true)
 | 
				
			||||||
				drop--
 | 
									drop--
 | 
				
			||||||
				queuedRateLimitCounter.Inc(1)
 | 
									queuedRateLimitCounter.Inc(1)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -209,15 +209,10 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	pool.lockedReset(nil, nil)
 | 
						pool.lockedReset(nil, nil)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	pendingTx, err := pool.Pending()
 | 
						_, err := pool.Pending()
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		t.Fatalf("Could not fetch pending transactions: %v", err)
 | 
							t.Fatalf("Could not fetch pending transactions: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	for addr, txs := range pendingTx {
 | 
					 | 
				
			||||||
		t.Logf("%0x: %d\n", addr, len(txs))
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	nonce = pool.State().GetNonce(address)
 | 
						nonce = pool.State().GetNonce(address)
 | 
				
			||||||
	if nonce != 2 {
 | 
						if nonce != 2 {
 | 
				
			||||||
		t.Fatalf("Invalid nonce, want 2, got %d", nonce)
 | 
							t.Fatalf("Invalid nonce, want 2, got %d", nonce)
 | 
				
			||||||
@@ -350,7 +345,7 @@ func TestTransactionChainFork(t *testing.T) {
 | 
				
			|||||||
	if _, err := pool.add(tx, false); err != nil {
 | 
						if _, err := pool.add(tx, false); err != nil {
 | 
				
			||||||
		t.Error("didn't expect error", err)
 | 
							t.Error("didn't expect error", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pool.removeTx(tx.Hash())
 | 
						pool.removeTx(tx.Hash(), true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// reset the pool's internal state
 | 
						// reset the pool's internal state
 | 
				
			||||||
	resetState()
 | 
						resetState()
 | 
				
			||||||
@@ -1388,13 +1383,13 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 | 
				
			|||||||
		t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
 | 
							t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// Ensure that adding high priced transactions drops cheap ones, but not own
 | 
						// Ensure that adding high priced transactions drops cheap ones, but not own
 | 
				
			||||||
	if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
 | 
						if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
 | 
				
			||||||
		t.Fatalf("failed to add well priced transaction: %v", err)
 | 
							t.Fatalf("failed to add well priced transaction: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil {
 | 
						if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2
 | 
				
			||||||
		t.Fatalf("failed to add well priced transaction: %v", err)
 | 
							t.Fatalf("failed to add well priced transaction: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil {
 | 
						if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
 | 
				
			||||||
		t.Fatalf("failed to add well priced transaction: %v", err)
 | 
							t.Fatalf("failed to add well priced transaction: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	pending, queued = pool.Stats()
 | 
						pending, queued = pool.Stats()
 | 
				
			||||||
@@ -1404,7 +1399,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 | 
				
			|||||||
	if queued != 2 {
 | 
						if queued != 2 {
 | 
				
			||||||
		t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
 | 
							t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err := validateEvents(events, 2); err != nil {
 | 
						if err := validateEvents(events, 1); err != nil {
 | 
				
			||||||
		t.Fatalf("additional event firing failed: %v", err)
 | 
							t.Fatalf("additional event firing failed: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	if err := validateTxPoolInternals(pool); err != nil {
 | 
						if err := validateTxPoolInternals(pool); err != nil {
 | 
				
			||||||
@@ -1430,6 +1425,74 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Tests that more expensive transactions push out cheap ones from the pool, but
 | 
				
			||||||
 | 
					// without producing instability by creating gaps that start jumping transactions
 | 
				
			||||||
 | 
					// back and forth between queued/pending.
 | 
				
			||||||
 | 
					func TestTransactionPoolStableUnderpricing(t *testing.T) {
 | 
				
			||||||
 | 
						t.Parallel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create the pool to test the pricing enforcement with
 | 
				
			||||||
 | 
						db, _ := ethdb.NewMemDatabase()
 | 
				
			||||||
 | 
						statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
 | 
				
			||||||
 | 
						blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						config := testTxPoolConfig
 | 
				
			||||||
 | 
						config.GlobalSlots = 128
 | 
				
			||||||
 | 
						config.GlobalQueue = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pool := NewTxPool(config, params.TestChainConfig, blockchain)
 | 
				
			||||||
 | 
						defer pool.Stop()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Keep track of transaction events to ensure all executables get announced
 | 
				
			||||||
 | 
						events := make(chan TxPreEvent, 32)
 | 
				
			||||||
 | 
						sub := pool.txFeed.Subscribe(events)
 | 
				
			||||||
 | 
						defer sub.Unsubscribe()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Create a number of test accounts and fund them
 | 
				
			||||||
 | 
						keys := make([]*ecdsa.PrivateKey, 2)
 | 
				
			||||||
 | 
						for i := 0; i < len(keys); i++ {
 | 
				
			||||||
 | 
							keys[i], _ = crypto.GenerateKey()
 | 
				
			||||||
 | 
							pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Fill up the entire queue with the same transaction price points
 | 
				
			||||||
 | 
						txs := types.Transactions{}
 | 
				
			||||||
 | 
						for i := uint64(0); i < config.GlobalSlots; i++ {
 | 
				
			||||||
 | 
							txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pool.AddRemotes(txs)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						pending, queued := pool.Stats()
 | 
				
			||||||
 | 
						if pending != int(config.GlobalSlots) {
 | 
				
			||||||
 | 
							t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if queued != 0 {
 | 
				
			||||||
 | 
							t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := validateEvents(events, int(config.GlobalSlots)); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("original event firing failed: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := validateTxPoolInternals(pool); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("pool internal state corrupted: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
 | 
				
			||||||
 | 
						if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("failed to add well priced transaction: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						pending, queued = pool.Stats()
 | 
				
			||||||
 | 
						if pending != int(config.GlobalSlots) {
 | 
				
			||||||
 | 
							t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if queued != 0 {
 | 
				
			||||||
 | 
							t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := validateEvents(events, 1); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("additional event firing failed: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := validateTxPoolInternals(pool); err != nil {
 | 
				
			||||||
 | 
							t.Fatalf("pool internal state corrupted: %v", err)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Tests that the pool rejects replacement transactions that don't meet the minimum
 | 
					// Tests that the pool rejects replacement transactions that don't meet the minimum
 | 
				
			||||||
// price bump required.
 | 
					// price bump required.
 | 
				
			||||||
func TestTransactionReplacement(t *testing.T) {
 | 
					func TestTransactionReplacement(t *testing.T) {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user