cmd, core, eth: journal local transactions to disk (#14784)

* core: reduce txpool event loop goroutines and sync structs

* cmd, core, eth: journal local transactions to disk

* core: journal replacement pending transactions too

* core: separate transaction journal from pool
This commit is contained in:
Péter Szilágyi
2017-07-28 15:09:39 +02:00
committed by GitHub
parent a602ee90f2
commit 3d32690b54
7 changed files with 416 additions and 84 deletions

View File

@ -19,8 +19,10 @@ package core
import (
"crypto/ecdsa"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"testing"
"time"
@ -33,6 +35,15 @@ import (
"github.com/ethereum/go-ethereum/params"
)
// testTxPoolConfig is a transaction pool configuration without stateful disk
// sideeffects used during testing.
var testTxPoolConfig TxPoolConfig
func init() {
testTxPoolConfig = DefaultTxPoolConfig
testTxPoolConfig.Journal = ""
}
func transaction(nonce uint64, gaslimit *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
return pricedTransaction(nonce, gaslimit, big.NewInt(1), key)
}
@ -47,8 +58,7 @@ func setupTxPool() (*TxPool, *ecdsa.PrivateKey) {
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
key, _ := crypto.GenerateKey()
pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
pool.resetState()
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
return pool, key
}
@ -125,9 +135,8 @@ func TestStateChangeDuringPoolReset(t *testing.T) {
gasLimitFunc := func() *big.Int { return big.NewInt(1000000000) }
pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc)
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, mux, stateFunc, gasLimitFunc)
defer pool.Stop()
pool.resetState()
nonce := pool.State().GetNonce(address)
if nonce != 0 {
@ -618,25 +627,25 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= DefaultTxPoolConfig.AccountQueue+5; i++ {
for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if len(pool.pending) != 0 {
t.Errorf("tx %d: pending pool size mismatch: have %d, want %d", i, len(pool.pending), 0)
}
if i <= DefaultTxPoolConfig.AccountQueue {
if i <= testTxPoolConfig.AccountQueue {
if pool.queue[account].Len() != int(i) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), i)
}
} else {
if pool.queue[account].Len() != int(DefaultTxPoolConfig.AccountQueue) {
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), DefaultTxPoolConfig.AccountQueue)
if pool.queue[account].Len() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("tx %d: queue limit mismatch: have %d, want %d", i, pool.queue[account].Len(), testTxPoolConfig.AccountQueue)
}
}
}
if len(pool.all) != int(DefaultTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), DefaultTxPoolConfig.AccountQueue)
if len(pool.all) != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue)
}
}
@ -657,13 +666,12 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.NoLocals = nolocals
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them (last one will be the local)
state, _ := pool.currentState()
@ -748,13 +756,12 @@ func testTransactionQueueTimeLimiting(t *testing.T, nolocals bool) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.Lifetime = 250 * time.Millisecond
config.NoLocals = nolocals
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
@ -817,7 +824,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
pool.resetState()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ {
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, big.NewInt(100000), key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
@ -828,8 +835,8 @@ func TestTransactionPendingLimiting(t *testing.T) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
}
}
if len(pool.all) != int(DefaultTxPoolConfig.AccountQueue+5) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), DefaultTxPoolConfig.AccountQueue+5)
if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5)
}
}
@ -845,7 +852,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
state1, _ := pool1.currentState()
state1.AddBalance(account1, big.NewInt(1000000))
for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ {
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool1.AddRemote(transaction(origin+i, big.NewInt(100000), key1)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
@ -857,7 +864,7 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
state2.AddBalance(account2, big.NewInt(1000000))
txns := []*types.Transaction{}
for i := uint64(0); i < DefaultTxPoolConfig.AccountQueue+5; i++ {
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
txns = append(txns, transaction(origin+i, big.NewInt(100000), key2))
}
pool2.AddRemotes(txns)
@ -888,12 +895,11 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.GlobalSlots = config.AccountSlots * 10
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them
state, _ := pool.currentState()
@ -935,14 +941,13 @@ func TestTransactionCapClearsFromAll(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.AccountSlots = 2
config.AccountQueue = 2
config.GlobalSlots = 8
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them
state, _ := pool.currentState()
@ -970,12 +975,11 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.GlobalSlots = 0
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them
state, _ := pool.currentState()
@ -1019,9 +1023,8 @@ func TestTransactionPoolRepricing(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them
state, _ := pool.currentState()
@ -1104,13 +1107,12 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := DefaultTxPoolConfig
config := testTxPoolConfig
config.GlobalSlots = 2
config.GlobalQueue = 2
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a number of test accounts and fund them
state, _ := pool.currentState()
@ -1192,9 +1194,8 @@ func TestTransactionReplacement(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
pool := NewTxPool(DefaultTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
defer pool.Stop()
pool.resetState()
// Create a test account to add transactions with
key, _ := crypto.GenerateKey()
@ -1204,7 +1205,7 @@ func TestTransactionReplacement(t *testing.T) {
// Add pending transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
price := int64(100)
threshold := (price * (100 + int64(DefaultTxPoolConfig.PriceBump))) / 100
threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap pending transaction: %v", err)
@ -1250,6 +1251,113 @@ func TestTransactionReplacement(t *testing.T) {
}
}
// Tests that local transactions are journaled to disk, but remote transactions
// get discarded between restarts.
func TestTransactionJournaling(t *testing.T) { testTransactionJournaling(t, false) }
func TestTransactionJournalingNoLocals(t *testing.T) { testTransactionJournaling(t, true) }
func testTransactionJournaling(t *testing.T, nolocals bool) {
// Create a temporary file for the journal
file, err := ioutil.TempFile("", "")
if err != nil {
t.Fatalf("failed to create temporary journal: %v", err)
}
journal := file.Name()
defer os.Remove(journal)
// Clean up the temporary file, we only need the path for now
file.Close()
os.Remove(journal)
// Create the original pool to inject transaction into the journal
db, _ := ethdb.NewMemDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
config := testTxPoolConfig
config.NoLocals = nolocals
config.Journal = journal
config.Rejournal = time.Second
pool := NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
remote, _ := crypto.GenerateKey()
statedb, _ = pool.currentState()
statedb.AddBalance(crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
statedb.AddBalance(crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
// Add three local and a remote transactions and ensure they are queued up
if err := pool.AddLocal(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.AddLocal(pricedTransaction(1, big.NewInt(100000), big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.AddLocal(pricedTransaction(2, big.NewInt(100000), big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, big.NewInt(100000), big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
pending, queued := pool.stats()
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
pending, queued = pool.stats()
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if nolocals {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else {
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
pool.resetState()
time.Sleep(2 * config.Rejournal)
pool.Stop()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
pool = NewTxPool(config, params.TestChainConfig, new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) })
pending, queued = pool.stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if nolocals {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
} else {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}
// Benchmarks the speed of validating the contents of the pending queue of the
// transaction pool.
func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) }