Merge pull request #1889 from karalabe/fast-sync-rebase

eth/63 fast synchronization algorithm
This commit is contained in:
Jeffrey Wilcke
2015-10-21 11:44:22 -07:00
53 changed files with 5047 additions and 1802 deletions

View File

@ -163,7 +163,7 @@ func benchInsertChain(b *testing.B, disk bool, gen func(int, *BlockGen)) {
// Generate a chain of b.N blocks using the supplied block
// generator function.
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{benchRootAddr, benchRootFunds})
chain := GenerateChain(genesis, db, b.N, gen)
chain, _ := GenerateChain(genesis, db, b.N, gen)
// Time the insertion of the new chain.
// State and blocks are stored in the same DB.

View File

@ -128,7 +128,7 @@ func (self *BlockProcessor) ApplyTransaction(gp *GasPool, statedb *state.StateDB
}
logs := statedb.GetLogs(tx.Hash())
receipt.SetLogs(logs)
receipt.Logs = logs
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
glog.V(logger.Debug).Infoln(receipt)
@ -212,14 +212,16 @@ func (sm *BlockProcessor) Process(block *types.Block) (logs vm.Logs, receipts ty
defer sm.mutex.Unlock()
if sm.bc.HasBlock(block.Hash()) {
return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
if _, err := state.New(block.Root(), sm.chainDb); err == nil {
return nil, nil, &KnownBlockError{block.Number(), block.Hash()}
}
}
if !sm.bc.HasBlock(block.ParentHash()) {
return nil, nil, ParentError(block.ParentHash())
if parent := sm.bc.GetBlock(block.ParentHash()); parent != nil {
if _, err := state.New(parent.Root(), sm.chainDb); err == nil {
return sm.processWithParent(block, parent)
}
}
parent := sm.bc.GetBlock(block.ParentHash())
return sm.processWithParent(block, parent)
return nil, nil, ParentError(block.ParentHash())
}
func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs vm.Logs, receipts types.Receipts, err error) {
@ -381,18 +383,40 @@ func (sm *BlockProcessor) GetLogs(block *types.Block) (logs vm.Logs, err error)
receipts := GetBlockReceipts(sm.chainDb, block.Hash())
// coalesce logs
for _, receipt := range receipts {
logs = append(logs, receipt.Logs()...)
logs = append(logs, receipt.Logs...)
}
return logs, nil
}
// ValidateHeader verifies the validity of a header, relying on the database and
// POW behind the block processor.
func (sm *BlockProcessor) ValidateHeader(header *types.Header, checkPow, uncle bool) error {
// Short circuit if the header's already known or its parent missing
if sm.bc.HasHeader(header.Hash()) {
return nil
}
if parent := sm.bc.GetHeader(header.ParentHash); parent == nil {
return ParentError(header.ParentHash)
} else {
return ValidateHeader(sm.Pow, header, parent, checkPow, uncle)
}
}
// ValidateHeaderWithParent verifies the validity of a header, relying on the database and
// POW behind the block processor.
func (sm *BlockProcessor) ValidateHeaderWithParent(header, parent *types.Header, checkPow, uncle bool) error {
if sm.bc.HasHeader(header.Hash()) {
return nil
}
return ValidateHeader(sm.Pow, header, parent, checkPow, uncle)
}
// See YP section 4.3.4. "Block Header Validity"
// Validates a header. Returns an error if the header is invalid.
func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, checkPow, uncle bool) error {
if big.NewInt(int64(len(header.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
return fmt.Errorf("Header extra data too long (%d)", len(header.Extra))
}
if uncle {
if header.Time.Cmp(common.MaxBig) == 1 {
return BlockTSTooBigErr
@ -429,7 +453,7 @@ func ValidateHeader(pow pow.PoW, header *types.Header, parent *types.Header, che
if checkPow {
// Verify the nonce of the header. Return an error if it's not valid
if !pow.Verify(types.NewBlockWithHeader(header)) {
return ValidationError("Header's nonce is invalid (= %x)", header.Nonce)
return &BlockNonceErr{Hash: header.Hash(), Number: header.Number, Nonce: header.Nonce.Uint64()}
}
}
return nil

View File

@ -70,16 +70,16 @@ func TestPutReceipt(t *testing.T) {
hash[0] = 2
receipt := new(types.Receipt)
receipt.SetLogs(vm.Logs{&vm.Log{
Address: addr,
Topics: []common.Hash{hash},
Data: []byte("hi"),
Number: 42,
TxHash: hash,
TxIndex: 0,
BlockHash: hash,
Index: 0,
}})
receipt.Logs = vm.Logs{&vm.Log{
Address: addr,
Topics: []common.Hash{hash},
Data: []byte("hi"),
BlockNumber: 42,
TxHash: hash,
TxIndex: 0,
BlockHash: hash,
Index: 0,
}}
PutReceipts(db, types.Receipts{receipt})
receipt = GetReceipt(db, common.Hash{})

View File

@ -18,10 +18,14 @@
package core
import (
crand "crypto/rand"
"errors"
"fmt"
"io"
"math"
"math/big"
mrand "math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
@ -29,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@ -36,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/pow"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/hashicorp/golang-lru"
)
@ -67,9 +73,10 @@ type BlockChain struct {
chainmu sync.RWMutex
tsmu sync.RWMutex
td *big.Int
currentBlock *types.Block
currentGasLimit *big.Int
checkpoint int // checkpoint counts towards the new checkpoint
currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
currentBlock *types.Block // Current head of the block chain
currentFastBlock *types.Block // Current head of the fast-sync chain (may be above the block chain!)
headerCache *lru.Cache // Cache for the most recent block headers
bodyCache *lru.Cache // Cache for the most recent block bodies
@ -84,7 +91,8 @@ type BlockChain struct {
procInterrupt int32 // interrupt signaler for block processing
wg sync.WaitGroup
pow pow.PoW
pow pow.PoW
rand *mrand.Rand
}
func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
@ -107,6 +115,12 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
futureBlocks: futureBlocks,
pow: pow,
}
// Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
return nil, err
}
bc.rand = mrand.New(mrand.NewSource(seed.Int64()))
bc.genesisBlock = bc.GetBlockByNumber(0)
if bc.genesisBlock == nil {
@ -120,20 +134,15 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
}
glog.V(logger.Info).Infoln("WARNING: Wrote default ethereum genesis block")
}
if err := bc.setLastState(); err != nil {
if err := bc.loadLastState(); err != nil {
return nil, err
}
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
for hash, _ := range BadHashes {
if block := bc.GetBlock(hash); block != nil {
glog.V(logger.Error).Infof("Found bad hash. Reorganising chain to state %x\n", block.ParentHash().Bytes()[:4])
block = bc.GetBlock(block.ParentHash())
if block == nil {
glog.Fatal("Unable to complete. Parent block not found. Corrupted DB?")
}
bc.SetHead(block)
glog.V(logger.Error).Infoln("Chain reorg was successfull. Resuming normal operation")
if header := bc.GetHeader(hash); header != nil {
glog.V(logger.Error).Infof("Found bad hash, rewinding chain to block #%d [%x…]", header.Number, header.ParentHash[:4])
bc.SetHead(header.Number.Uint64() - 1)
glog.V(logger.Error).Infoln("Chain rewind was successful, resuming normal operation")
}
}
// Take ownership of this particular state
@ -141,30 +150,146 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
return bc, nil
}
func (bc *BlockChain) SetHead(head *types.Block) {
// loadLastState loads the last known chain state from the database. This method
// assumes that the chain manager mutex is held.
func (self *BlockChain) loadLastState() error {
// Restore the last known head block
head := GetHeadBlockHash(self.chainDb)
if head == (common.Hash{}) {
// Corrupt or empty database, init from scratch
self.Reset()
} else {
if block := self.GetBlock(head); block != nil {
// Block found, set as the current head
self.currentBlock = block
} else {
// Corrupt or empty database, init from scratch
self.Reset()
}
}
// Restore the last known head header
self.currentHeader = self.currentBlock.Header()
if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) {
if header := self.GetHeader(head); header != nil {
self.currentHeader = header
}
}
// Restore the last known head fast block
self.currentFastBlock = self.currentBlock
if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) {
if block := self.GetBlock(head); block != nil {
self.currentFastBlock = block
}
}
// Issue a status log and return
headerTd := self.GetTd(self.currentHeader.Hash())
blockTd := self.GetTd(self.currentBlock.Hash())
fastTd := self.GetTd(self.currentFastBlock.Hash())
glog.V(logger.Info).Infof("Last header: #%d [%x…] TD=%v", self.currentHeader.Number, self.currentHeader.Hash().Bytes()[:4], headerTd)
glog.V(logger.Info).Infof("Last block: #%d [%x…] TD=%v", self.currentBlock.Number(), self.currentBlock.Hash().Bytes()[:4], blockTd)
glog.V(logger.Info).Infof("Fast block: #%d [%x…] TD=%v", self.currentFastBlock.Number(), self.currentFastBlock.Hash().Bytes()[:4], fastTd)
return nil
}
// SetHead rewinds the local chain to a new head. In the case of headers, everything
// above the new head will be deleted and the new one set. In the case of blocks
// though, the head may be further rewound if block bodies are missing (non-archive
// nodes after a fast sync).
func (bc *BlockChain) SetHead(head uint64) {
bc.mu.Lock()
defer bc.mu.Unlock()
for block := bc.currentBlock; block != nil && block.Hash() != head.Hash(); block = bc.GetBlock(block.ParentHash()) {
DeleteBlock(bc.chainDb, block.Hash())
// Figure out the highest known canonical headers and/or blocks
height := uint64(0)
if bc.currentHeader != nil {
if hh := bc.currentHeader.Number.Uint64(); hh > height {
height = hh
}
}
if bc.currentBlock != nil {
if bh := bc.currentBlock.NumberU64(); bh > height {
height = bh
}
}
if bc.currentFastBlock != nil {
if fbh := bc.currentFastBlock.NumberU64(); fbh > height {
height = fbh
}
}
// Gather all the hashes that need deletion
drop := make(map[common.Hash]struct{})
for bc.currentHeader != nil && bc.currentHeader.Number.Uint64() > head {
drop[bc.currentHeader.Hash()] = struct{}{}
bc.currentHeader = bc.GetHeader(bc.currentHeader.ParentHash)
}
for bc.currentBlock != nil && bc.currentBlock.NumberU64() > head {
drop[bc.currentBlock.Hash()] = struct{}{}
bc.currentBlock = bc.GetBlock(bc.currentBlock.ParentHash())
}
for bc.currentFastBlock != nil && bc.currentFastBlock.NumberU64() > head {
drop[bc.currentFastBlock.Hash()] = struct{}{}
bc.currentFastBlock = bc.GetBlock(bc.currentFastBlock.ParentHash())
}
// Roll back the canonical chain numbering
for i := height; i > head; i-- {
DeleteCanonicalHash(bc.chainDb, i)
}
// Delete everything found by the above rewind
for hash, _ := range drop {
DeleteHeader(bc.chainDb, hash)
DeleteBody(bc.chainDb, hash)
DeleteTd(bc.chainDb, hash)
}
// Clear out any stale content from the caches
bc.headerCache.Purge()
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.blockCache.Purge()
bc.futureBlocks.Purge()
bc.currentBlock = head
bc.setTotalDifficulty(bc.GetTd(head.Hash()))
bc.insert(head)
bc.setLastState()
// Update all computed fields to the new head
if bc.currentBlock == nil {
bc.currentBlock = bc.genesisBlock
}
if bc.currentHeader == nil {
bc.currentHeader = bc.genesisBlock.Header()
}
if bc.currentFastBlock == nil {
bc.currentFastBlock = bc.genesisBlock
}
if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil {
glog.Fatalf("failed to reset head block hash: %v", err)
}
if err := WriteHeadHeaderHash(bc.chainDb, bc.currentHeader.Hash()); err != nil {
glog.Fatalf("failed to reset head header hash: %v", err)
}
if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil {
glog.Fatalf("failed to reset head fast block hash: %v", err)
}
bc.loadLastState()
}
func (self *BlockChain) Td() *big.Int {
self.mu.RLock()
defer self.mu.RUnlock()
// FastSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
// Make sure that both the block as well at its state trie exists
block := self.GetBlock(hash)
if block == nil {
return fmt.Errorf("non existent block [%x…]", hash[:4])
}
if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil {
return err
}
// If all checks out, manually set the head block
self.mu.Lock()
self.currentBlock = block
self.mu.Unlock()
return new(big.Int).Set(self.td)
glog.V(logger.Info).Infof("committed block #%d [%x…] as new head", block.Number(), hash[:4])
return nil
}
func (self *BlockChain) GasLimit() *big.Int {
@ -181,6 +306,17 @@ func (self *BlockChain) LastBlockHash() common.Hash {
return self.currentBlock.Hash()
}
// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentHeader() *types.Header {
self.mu.RLock()
defer self.mu.RUnlock()
return self.currentHeader
}
// CurrentBlock retrieves the current head block of the canonical chain. The
// block is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentBlock() *types.Block {
self.mu.RLock()
defer self.mu.RUnlock()
@ -188,11 +324,20 @@ func (self *BlockChain) CurrentBlock() *types.Block {
return self.currentBlock
}
// CurrentFastBlock retrieves the current fast-sync head block of the canonical
// chain. The block is retrieved from the blockchain's internal cache.
func (self *BlockChain) CurrentFastBlock() *types.Block {
self.mu.RLock()
defer self.mu.RUnlock()
return self.currentFastBlock
}
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
self.mu.RLock()
defer self.mu.RUnlock()
return new(big.Int).Set(self.td), self.currentBlock.Hash(), self.genesisBlock.Hash()
return self.GetTd(self.currentBlock.Hash()), self.currentBlock.Hash(), self.genesisBlock.Hash()
}
func (self *BlockChain) SetProcessor(proc types.BlockProcessor) {
@ -203,26 +348,6 @@ func (self *BlockChain) State() (*state.StateDB, error) {
return state.New(self.CurrentBlock().Root(), self.chainDb)
}
func (bc *BlockChain) setLastState() error {
head := GetHeadBlockHash(bc.chainDb)
if head != (common.Hash{}) {
block := bc.GetBlock(head)
if block != nil {
bc.currentBlock = block
}
} else {
bc.Reset()
}
bc.td = bc.GetTd(bc.currentBlock.Hash())
bc.currentGasLimit = CalcGasLimit(bc.currentBlock)
if glog.V(logger.Info) {
glog.Infof("Last block (#%v) %x TD=%v\n", bc.currentBlock.Number(), bc.currentBlock.Hash(), bc.td)
}
return nil
}
// Reset purges the entire blockchain, restoring it to its genesis state.
func (bc *BlockChain) Reset() {
bc.ResetWithGenesisBlock(bc.genesisBlock)
@ -231,20 +356,13 @@ func (bc *BlockChain) Reset() {
// ResetWithGenesisBlock purges the entire blockchain, restoring it to the
// specified genesis state.
func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
// Dump the entire block chain and purge the caches
bc.SetHead(0)
bc.mu.Lock()
defer bc.mu.Unlock()
// Dump the entire block chain and purge the caches
for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.ParentHash()) {
DeleteBlock(bc.chainDb, block.Hash())
}
bc.headerCache.Purge()
bc.bodyCache.Purge()
bc.bodyRLPCache.Purge()
bc.blockCache.Purge()
bc.futureBlocks.Purge()
// Prepare the genesis block and reinitialize the chain
// Prepare the genesis block and reinitialise the chain
if err := WriteTd(bc.chainDb, genesis.Hash(), genesis.Difficulty()); err != nil {
glog.Fatalf("failed to write genesis block TD: %v", err)
}
@ -254,7 +372,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) {
bc.genesisBlock = genesis
bc.insert(bc.genesisBlock)
bc.currentBlock = bc.genesisBlock
bc.setTotalDifficulty(genesis.Difficulty())
bc.currentHeader = bc.genesisBlock.Header()
bc.currentFastBlock = bc.genesisBlock
}
// Export writes the active chain to the given writer.
@ -290,17 +409,30 @@ func (self *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return nil
}
// insert injects a block into the current chain block chain. Note, this function
// assumes that the `mu` mutex is held!
// insert injects a new head block into the current block chain. This method
// assumes that the block is indeed a true head. It will also reset the head
// header and the head fast sync block to this very same block to prevent them
// from pointing to a possibly old canonical chain (i.e. side chain by now).
//
// Note, this function assumes that the `mu` mutex is held!
func (bc *BlockChain) insert(block *types.Block) {
// Add the block to the canonical chain number scheme and mark as the head
if err := WriteCanonicalHash(bc.chainDb, block.Hash(), block.NumberU64()); err != nil {
glog.Fatalf("failed to insert block number: %v", err)
}
if err := WriteHeadBlockHash(bc.chainDb, block.Hash()); err != nil {
glog.Fatalf("failed to insert block number: %v", err)
glog.Fatalf("failed to insert head block hash: %v", err)
}
if err := WriteHeadHeaderHash(bc.chainDb, block.Hash()); err != nil {
glog.Fatalf("failed to insert head header hash: %v", err)
}
if err := WriteHeadFastBlockHash(bc.chainDb, block.Hash()); err != nil {
glog.Fatalf("failed to insert head fast block hash: %v", err)
}
// Update the internal state with the head block
bc.currentBlock = block
bc.currentHeader = block.Header()
bc.currentFastBlock = block
}
// Accessors
@ -456,19 +588,15 @@ func (self *BlockChain) GetBlocksFromHash(hash common.Hash, n int) (blocks []*ty
return
}
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) (uncles []*types.Header) {
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {
uncles := []*types.Header{}
for i := 0; block != nil && i < length; i++ {
uncles = append(uncles, block.Uncles()...)
block = self.GetBlock(block.ParentHash())
}
return
}
// setTotalDifficulty updates the TD of the chain manager. Note, this function
// assumes that the `mu` mutex is held!
func (bc *BlockChain) setTotalDifficulty(td *big.Int) {
bc.td = new(big.Int).Set(td)
return uncles
}
func (bc *BlockChain) Stop() {
@ -504,6 +632,337 @@ const (
SideStatTy
)
// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (self *BlockChain) writeHeader(header *types.Header) error {
self.wg.Add(1)
defer self.wg.Done()
// Calculate the total difficulty of the header
ptd := self.GetTd(header.ParentHash)
if ptd == nil {
return ParentError(header.ParentHash)
}
td := new(big.Int).Add(header.Difficulty, ptd)
// Make sure no inconsistent state is leaked during insertion
self.mu.Lock()
defer self.mu.Unlock()
// If the total difficulty is higher than our known, add it to the canonical chain
if td.Cmp(self.GetTd(self.currentHeader.Hash())) > 0 {
// Delete any canonical number assignments above the new head
for i := header.Number.Uint64() + 1; GetCanonicalHash(self.chainDb, i) != (common.Hash{}); i++ {
DeleteCanonicalHash(self.chainDb, i)
}
// Overwrite any stale canonical number assignments
head := self.GetHeader(header.ParentHash)
for GetCanonicalHash(self.chainDb, head.Number.Uint64()) != head.Hash() {
WriteCanonicalHash(self.chainDb, head.Hash(), head.Number.Uint64())
head = self.GetHeader(head.ParentHash)
}
// Extend the canonical chain with the new header
if err := WriteCanonicalHash(self.chainDb, header.Hash(), header.Number.Uint64()); err != nil {
glog.Fatalf("failed to insert header number: %v", err)
}
if err := WriteHeadHeaderHash(self.chainDb, header.Hash()); err != nil {
glog.Fatalf("failed to insert head header hash: %v", err)
}
self.currentHeader = types.CopyHeader(header)
}
// Irrelevant of the canonical status, write the header itself to the database
if err := WriteTd(self.chainDb, header.Hash(), td); err != nil {
glog.Fatalf("failed to write header total difficulty: %v", err)
}
if err := WriteHeader(self.chainDb, header); err != nil {
glog.Fatalf("filed to write header contents: %v", err)
}
return nil
}
// InsertHeaderChain attempts to insert the given header chain in to the local
// chain, possibly creating a reorg. If an error is returned, it will return the
// index number of the failing header as well an error describing what went wrong.
//
// The verify parameter can be used to fine tune whether nonce verification
// should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
self.wg.Add(1)
defer self.wg.Done()
// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
defer self.chainmu.Unlock()
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
start := time.Now()
// Generate the list of headers that should be POW verified
verify := make([]bool, len(chain))
for i := 0; i < len(verify)/checkFreq; i++ {
index := i*checkFreq + self.rand.Intn(checkFreq)
if index >= len(verify) {
index = len(verify) - 1
}
verify[index] = true
}
verify[len(verify)-1] = true // Last should always be verified to avoid junk
// Create the header verification task queue and worker functions
tasks := make(chan int, len(chain))
for i := 0; i < len(chain); i++ {
tasks <- i
}
close(tasks)
errs, failed := make([]error, len(tasks)), int32(0)
process := func(worker int) {
for index := range tasks {
header, hash := chain[index], chain[index].Hash()
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&self.procInterrupt) == 1 {
return
}
if atomic.LoadInt32(&failed) > 0 {
return
}
// Short circuit if the header is bad or already known
if BadHashes[hash] {
errs[index] = BadHashError(hash)
atomic.AddInt32(&failed, 1)
return
}
if self.HasHeader(hash) {
continue
}
// Verify that the header honors the chain parameters
checkPow := verify[index]
var err error
if index == 0 {
err = self.processor.ValidateHeader(header, checkPow, false)
} else {
err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false)
}
if err != nil {
errs[index] = err
atomic.AddInt32(&failed, 1)
return
}
}
}
// Start as many worker threads as goroutines allowed
pending := new(sync.WaitGroup)
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
pending.Add(1)
go func(id int) {
defer pending.Done()
process(id)
}(i)
}
pending.Wait()
// If anything failed, report
if failed > 0 {
for i, err := range errs {
if err != nil {
return i, err
}
}
}
// All headers passed verification, import them into the database
for i, header := range chain {
// Short circuit insertion if shutting down
if atomic.LoadInt32(&self.procInterrupt) == 1 {
glog.V(logger.Debug).Infoln("premature abort during header chain processing")
break
}
hash := header.Hash()
// If the header's already known, skip it, otherwise store
if self.HasHeader(hash) {
stats.ignored++
continue
}
if err := self.writeHeader(header); err != nil {
return i, err
}
stats.processed++
}
// Report some public statistics so the user has a clue what's going on
first, last := chain[0], chain[len(chain)-1]
glog.V(logger.Info).Infof("imported %d header(s) (%d ignored) in %v. #%v [%x… / %x…]", stats.processed, stats.ignored,
time.Since(start), last.Number, first.Hash().Bytes()[:4], last.Hash().Bytes()[:4])
return 0, nil
}
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *BlockChain) Rollback(chain []common.Hash) {
self.mu.Lock()
defer self.mu.Unlock()
for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
if self.currentHeader.Hash() == hash {
self.currentHeader = self.GetHeader(self.currentHeader.ParentHash)
WriteHeadHeaderHash(self.chainDb, self.currentHeader.Hash())
}
if self.currentFastBlock.Hash() == hash {
self.currentFastBlock = self.GetBlock(self.currentFastBlock.ParentHash())
WriteHeadFastBlockHash(self.chainDb, self.currentFastBlock.Hash())
}
if self.currentBlock.Hash() == hash {
self.currentBlock = self.GetBlock(self.currentBlock.ParentHash())
WriteHeadBlockHash(self.chainDb, self.currentBlock.Hash())
}
}
}
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
self.wg.Add(1)
defer self.wg.Done()
// Collect some import statistics to report on
stats := struct{ processed, ignored int32 }{}
start := time.Now()
// Create the block importing task queue and worker functions
tasks := make(chan int, len(blockChain))
for i := 0; i < len(blockChain) && i < len(receiptChain); i++ {
tasks <- i
}
close(tasks)
errs, failed := make([]error, len(tasks)), int32(0)
process := func(worker int) {
for index := range tasks {
block, receipts := blockChain[index], receiptChain[index]
// Short circuit insertion if shutting down or processing failed
if atomic.LoadInt32(&self.procInterrupt) == 1 {
return
}
if atomic.LoadInt32(&failed) > 0 {
return
}
// Short circuit if the owner header is unknown
if !self.HasHeader(block.Hash()) {
errs[index] = fmt.Errorf("containing header #%d [%x…] unknown", block.Number(), block.Hash().Bytes()[:4])
atomic.AddInt32(&failed, 1)
return
}
// Skip if the entire data is already known
if self.HasBlock(block.Hash()) {
atomic.AddInt32(&stats.ignored, 1)
continue
}
// Compute all the non-consensus fields of the receipts
transactions, logIndex := block.Transactions(), uint(0)
for j := 0; j < len(receipts); j++ {
// The transaction hash can be retrieved from the transaction itself
receipts[j].TxHash = transactions[j].Hash()
// The contract address can be derived from the transaction itself
if MessageCreatesContract(transactions[j]) {
from, _ := transactions[j].From()
receipts[j].ContractAddress = crypto.CreateAddress(from, transactions[j].Nonce())
}
// The used gas can be calculated based on previous receipts
if j == 0 {
receipts[j].GasUsed = new(big.Int).Set(receipts[j].CumulativeGasUsed)
} else {
receipts[j].GasUsed = new(big.Int).Sub(receipts[j].CumulativeGasUsed, receipts[j-1].CumulativeGasUsed)
}
// The derived log fields can simply be set from the block and transaction
for k := 0; k < len(receipts[j].Logs); k++ {
receipts[j].Logs[k].BlockNumber = block.NumberU64()
receipts[j].Logs[k].BlockHash = block.Hash()
receipts[j].Logs[k].TxHash = receipts[j].TxHash
receipts[j].Logs[k].TxIndex = uint(j)
receipts[j].Logs[k].Index = logIndex
logIndex++
}
}
// Write all the data out into the database
if err := WriteBody(self.chainDb, block.Hash(), &types.Body{block.Transactions(), block.Uncles()}); err != nil {
errs[index] = fmt.Errorf("failed to write block body: %v", err)
atomic.AddInt32(&failed, 1)
glog.Fatal(errs[index])
return
}
if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write block receipts: %v", err)
atomic.AddInt32(&failed, 1)
glog.Fatal(errs[index])
return
}
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
errs[index] = fmt.Errorf("failed to write log blooms: %v", err)
atomic.AddInt32(&failed, 1)
glog.Fatal(errs[index])
return
}
atomic.AddInt32(&stats.processed, 1)
}
}
// Start as many worker threads as goroutines allowed
pending := new(sync.WaitGroup)
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
pending.Add(1)
go func(id int) {
defer pending.Done()
process(id)
}(i)
}
pending.Wait()
// If anything failed, report
if failed > 0 {
for i, err := range errs {
if err != nil {
return i, err
}
}
}
if atomic.LoadInt32(&self.procInterrupt) == 1 {
glog.V(logger.Debug).Infoln("premature abort during receipt chain processing")
return 0, nil
}
// Update the head fast sync block if better
self.mu.Lock()
head := blockChain[len(errs)-1]
if self.GetTd(self.currentFastBlock.Hash()).Cmp(self.GetTd(head.Hash())) < 0 {
if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil {
glog.Fatalf("failed to update head fast block hash: %v", err)
}
self.currentFastBlock = head
}
self.mu.Unlock()
// Report some public statistics so the user has a clue what's going on
first, last := blockChain[0], blockChain[len(blockChain)-1]
glog.V(logger.Info).Infof("imported %d receipt(s) (%d ignored) in %v. #%d [%x… / %x…]", stats.processed, stats.ignored,
time.Since(start), last.Number(), first.Hash().Bytes()[:4], last.Hash().Bytes()[:4])
return 0, nil
}
// WriteBlock writes the block to the chain.
func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err error) {
self.wg.Add(1)
@ -516,38 +975,31 @@ func (self *BlockChain) WriteBlock(block *types.Block) (status writeStatus, err
}
td := new(big.Int).Add(block.Difficulty(), ptd)
self.mu.RLock()
cblock := self.currentBlock
self.mu.RUnlock()
// Make sure no inconsistent state is leaked during insertion
self.mu.Lock()
defer self.mu.Unlock()
// Compare the TD of the last known block in the canonical chain to make sure it's greater.
// At this point it's possible that a different chain (fork) becomes the new canonical chain.
if td.Cmp(self.Td()) > 0 {
// chain fork
if block.ParentHash() != cblock.Hash() {
// during split we merge two different chains and create the new canonical chain
err := self.reorg(cblock, block)
if err != nil {
// If the total difficulty is higher than our known, add it to the canonical chain
if td.Cmp(self.GetTd(self.currentBlock.Hash())) > 0 {
// Reorganize the chain if the parent is not the head block
if block.ParentHash() != self.currentBlock.Hash() {
if err := self.reorg(self.currentBlock, block); err != nil {
return NonStatTy, err
}
}
status = CanonStatTy
self.mu.Lock()
self.setTotalDifficulty(td)
// Insert the block as the new head of the chain
self.insert(block)
self.mu.Unlock()
status = CanonStatTy
} else {
status = SideStatTy
}
// Irrelevant of the canonical status, write the block itself to the database
if err := WriteTd(self.chainDb, block.Hash(), td); err != nil {
glog.Fatalf("failed to write block total difficulty: %v", err)
}
if err := WriteBlock(self.chainDb, block); err != nil {
glog.Fatalf("filed to write block contents: %v", err)
}
// Delete from future blocks
self.futureBlocks.Remove(block.Hash())
return
@ -580,7 +1032,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
txcount := 0
for i, block := range chain {
if atomic.LoadInt32(&self.procInterrupt) == 1 {
glog.V(logger.Debug).Infoln("Premature abort during chain processing")
glog.V(logger.Debug).Infoln("Premature abort during block chain processing")
break
}
@ -636,7 +1088,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return i, err
}
if err := PutBlockReceipts(self.chainDb, block, receipts); err != nil {
if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
}
@ -691,9 +1143,6 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// to be part of the new canonical chain and accumulates potential missing transactions and post an
// event about them
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
self.mu.Lock()
defer self.mu.Unlock()
var (
newChain types.Blocks
commonBlock *types.Block
@ -788,8 +1237,7 @@ func (self *BlockChain) postChainEvents(events []interface{}) {
if event, ok := event.(ChainEvent); ok {
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
// and in most cases isn't even necessary.
if self.currentBlock.Hash() == event.Hash {
self.currentGasLimit = CalcGasLimit(event.Block)
if self.LastBlockHash() == event.Hash {
self.eventMux.Post(ChainHeadEvent{event.Block})
}
}

View File

@ -64,44 +64,58 @@ func theBlockChain(db ethdb.Database, t *testing.T) *BlockChain {
}
// Test fork of length N starting from block i
func testFork(t *testing.T, bman *BlockProcessor, i, N int, f func(td1, td2 *big.Int)) {
// switch databases to process the new chain
db, err := ethdb.NewMemDatabase()
if err != nil {
t.Fatal("Failed to create db:", err)
}
// copy old chain up to i into new db with deterministic canonical
bman2, err := newCanonical(i, db)
func testFork(t *testing.T, processor *BlockProcessor, i, n int, full bool, comparator func(td1, td2 *big.Int)) {
// Copy old chain up to #i into a new db
db, processor2, err := newCanonical(i, full)
if err != nil {
t.Fatal("could not make new canonical in testFork", err)
}
// assert the bmans have the same block at i
bi1 := bman.bc.GetBlockByNumber(uint64(i)).Hash()
bi2 := bman2.bc.GetBlockByNumber(uint64(i)).Hash()
if bi1 != bi2 {
fmt.Printf("%+v\n%+v\n\n", bi1, bi2)
t.Fatal("chains do not have the same hash at height", i)
// Assert the chains have the same header/block at #i
var hash1, hash2 common.Hash
if full {
hash1 = processor.bc.GetBlockByNumber(uint64(i)).Hash()
hash2 = processor2.bc.GetBlockByNumber(uint64(i)).Hash()
} else {
hash1 = processor.bc.GetHeaderByNumber(uint64(i)).Hash()
hash2 = processor2.bc.GetHeaderByNumber(uint64(i)).Hash()
}
bman2.bc.SetProcessor(bman2)
// extend the fork
parent := bman2.bc.CurrentBlock()
chainB := makeChain(parent, N, db, forkSeed)
_, err = bman2.bc.InsertChain(chainB)
if err != nil {
t.Fatal("Insert chain error for fork:", err)
if hash1 != hash2 {
t.Errorf("chain content mismatch at %d: have hash %v, want hash %v", i, hash2, hash1)
}
tdpre := bman.bc.Td()
// Test the fork's blocks on the original chain
td, err := testChain(chainB, bman)
if err != nil {
t.Fatal("expected chainB not to give errors:", err)
// Extend the newly created chain
var (
blockChainB []*types.Block
headerChainB []*types.Header
)
if full {
blockChainB = makeBlockChain(processor2.bc.CurrentBlock(), n, db, forkSeed)
if _, err := processor2.bc.InsertChain(blockChainB); err != nil {
t.Fatalf("failed to insert forking chain: %v", err)
}
} else {
headerChainB = makeHeaderChain(processor2.bc.CurrentHeader(), n, db, forkSeed)
if _, err := processor2.bc.InsertHeaderChain(headerChainB, 1); err != nil {
t.Fatalf("failed to insert forking chain: %v", err)
}
}
// Compare difficulties
f(tdpre, td)
// Sanity check that the forked chain can be imported into the original
var tdPre, tdPost *big.Int
// Loop over parents making sure reconstruction is done properly
if full {
tdPre = processor.bc.GetTd(processor.bc.CurrentBlock().Hash())
if err := testBlockChainImport(blockChainB, processor); err != nil {
t.Fatalf("failed to import forked block chain: %v", err)
}
tdPost = processor.bc.GetTd(blockChainB[len(blockChainB)-1].Hash())
} else {
tdPre = processor.bc.GetTd(processor.bc.CurrentHeader().Hash())
if err := testHeaderChainImport(headerChainB, processor); err != nil {
t.Fatalf("failed to import forked header chain: %v", err)
}
tdPost = processor.bc.GetTd(headerChainB[len(headerChainB)-1].Hash())
}
// Compare the total difficulties of the chains
comparator(tdPre, tdPost)
}
func printChain(bc *BlockChain) {
@ -111,22 +125,41 @@ func printChain(bc *BlockChain) {
}
}
// process blocks against a chain
func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) {
for _, block := range chainB {
_, _, err := bman.bc.processor.Process(block)
if err != nil {
// testBlockChainImport tries to process a chain of blocks, writing them into
// the database if successful.
func testBlockChainImport(chain []*types.Block, processor *BlockProcessor) error {
for _, block := range chain {
// Try and process the block
if _, _, err := processor.Process(block); err != nil {
if IsKnownBlockErr(err) {
continue
}
return nil, err
return err
}
bman.bc.mu.Lock()
WriteTd(bman.bc.chainDb, block.Hash(), new(big.Int).Add(block.Difficulty(), bman.bc.GetTd(block.ParentHash())))
WriteBlock(bman.bc.chainDb, block)
bman.bc.mu.Unlock()
// Manually insert the block into the database, but don't reorganize (allows subsequent testing)
processor.bc.mu.Lock()
WriteTd(processor.chainDb, block.Hash(), new(big.Int).Add(block.Difficulty(), processor.bc.GetTd(block.ParentHash())))
WriteBlock(processor.chainDb, block)
processor.bc.mu.Unlock()
}
return bman.bc.GetTd(chainB[len(chainB)-1].Hash()), nil
return nil
}
// testHeaderChainImport tries to process a chain of header, writing them into
// the database if successful.
func testHeaderChainImport(chain []*types.Header, processor *BlockProcessor) error {
for _, header := range chain {
// Try and validate the header
if err := processor.ValidateHeader(header, false, false); err != nil {
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
processor.bc.mu.Lock()
WriteTd(processor.chainDb, header.Hash(), new(big.Int).Add(header.Difficulty, processor.bc.GetTd(header.ParentHash)))
WriteHeader(processor.chainDb, header)
processor.bc.mu.Unlock()
}
return nil
}
func loadChain(fn string, t *testing.T) (types.Blocks, error) {
@ -154,139 +187,147 @@ func insertChain(done chan bool, blockchain *BlockChain, chain types.Blocks, t *
}
func TestLastBlock(t *testing.T) {
db, err := ethdb.NewMemDatabase()
if err != nil {
t.Fatal("Failed to create db:", err)
}
db, _ := ethdb.NewMemDatabase()
bchain := theBlockChain(db, t)
block := makeChain(bchain.CurrentBlock(), 1, db, 0)[0]
block := makeBlockChain(bchain.CurrentBlock(), 1, db, 0)[0]
bchain.insert(block)
if block.Hash() != GetHeadBlockHash(db) {
t.Errorf("Write/Get HeadBlockHash failed")
}
}
func TestExtendCanonical(t *testing.T) {
CanonicalLength := 5
db, err := ethdb.NewMemDatabase()
// Tests that given a starting canonical chain of a given size, it can be extended
// with various length chains.
func TestExtendCanonicalHeaders(t *testing.T) { testExtendCanonical(t, false) }
func TestExtendCanonicalBlocks(t *testing.T) { testExtendCanonical(t, true) }
func testExtendCanonical(t *testing.T, full bool) {
length := 5
// Make first chain starting from genesis
_, processor, err := newCanonical(length, full)
if err != nil {
t.Fatal("Failed to create db:", err)
t.Fatalf("failed to make new canonical chain: %v", err)
}
// make first chain starting from genesis
bman, err := newCanonical(CanonicalLength, db)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
f := func(td1, td2 *big.Int) {
// Define the difficulty comparator
better := func(td1, td2 *big.Int) {
if td2.Cmp(td1) <= 0 {
t.Error("expected chainB to have higher difficulty. Got", td2, "expected more than", td1)
t.Errorf("total difficulty mismatch: have %v, expected more than %v", td2, td1)
}
}
// Start fork from current height (CanonicalLength)
testFork(t, bman, CanonicalLength, 1, f)
testFork(t, bman, CanonicalLength, 2, f)
testFork(t, bman, CanonicalLength, 5, f)
testFork(t, bman, CanonicalLength, 10, f)
// Start fork from current height
testFork(t, processor, length, 1, full, better)
testFork(t, processor, length, 2, full, better)
testFork(t, processor, length, 5, full, better)
testFork(t, processor, length, 10, full, better)
}
func TestShorterFork(t *testing.T) {
db, err := ethdb.NewMemDatabase()
// Tests that given a starting canonical chain of a given size, creating shorter
// forks do not take canonical ownership.
func TestShorterForkHeaders(t *testing.T) { testShorterFork(t, false) }
func TestShorterForkBlocks(t *testing.T) { testShorterFork(t, true) }
func testShorterFork(t *testing.T, full bool) {
length := 10
// Make first chain starting from genesis
_, processor, err := newCanonical(length, full)
if err != nil {
t.Fatal("Failed to create db:", err)
t.Fatalf("failed to make new canonical chain: %v", err)
}
// make first chain starting from genesis
bman, err := newCanonical(10, db)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
f := func(td1, td2 *big.Int) {
// Define the difficulty comparator
worse := func(td1, td2 *big.Int) {
if td2.Cmp(td1) >= 0 {
t.Error("expected chainB to have lower difficulty. Got", td2, "expected less than", td1)
t.Errorf("total difficulty mismatch: have %v, expected less than %v", td2, td1)
}
}
// Sum of numbers must be less than 10
// for this to be a shorter fork
testFork(t, bman, 0, 3, f)
testFork(t, bman, 0, 7, f)
testFork(t, bman, 1, 1, f)
testFork(t, bman, 1, 7, f)
testFork(t, bman, 5, 3, f)
testFork(t, bman, 5, 4, f)
// Sum of numbers must be less than `length` for this to be a shorter fork
testFork(t, processor, 0, 3, full, worse)
testFork(t, processor, 0, 7, full, worse)
testFork(t, processor, 1, 1, full, worse)
testFork(t, processor, 1, 7, full, worse)
testFork(t, processor, 5, 3, full, worse)
testFork(t, processor, 5, 4, full, worse)
}
func TestLongerFork(t *testing.T) {
db, err := ethdb.NewMemDatabase()
// Tests that given a starting canonical chain of a given size, creating longer
// forks do take canonical ownership.
func TestLongerForkHeaders(t *testing.T) { testLongerFork(t, false) }
func TestLongerForkBlocks(t *testing.T) { testLongerFork(t, true) }
func testLongerFork(t *testing.T, full bool) {
length := 10
// Make first chain starting from genesis
_, processor, err := newCanonical(length, full)
if err != nil {
t.Fatal("Failed to create db:", err)
t.Fatalf("failed to make new canonical chain: %v", err)
}
// make first chain starting from genesis
bman, err := newCanonical(10, db)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
f := func(td1, td2 *big.Int) {
// Define the difficulty comparator
better := func(td1, td2 *big.Int) {
if td2.Cmp(td1) <= 0 {
t.Error("expected chainB to have higher difficulty. Got", td2, "expected more than", td1)
t.Errorf("total difficulty mismatch: have %v, expected more than %v", td2, td1)
}
}
// Sum of numbers must be greater than 10
// for this to be a longer fork
testFork(t, bman, 0, 11, f)
testFork(t, bman, 0, 15, f)
testFork(t, bman, 1, 10, f)
testFork(t, bman, 1, 12, f)
testFork(t, bman, 5, 6, f)
testFork(t, bman, 5, 8, f)
// Sum of numbers must be greater than `length` for this to be a longer fork
testFork(t, processor, 0, 11, full, better)
testFork(t, processor, 0, 15, full, better)
testFork(t, processor, 1, 10, full, better)
testFork(t, processor, 1, 12, full, better)
testFork(t, processor, 5, 6, full, better)
testFork(t, processor, 5, 8, full, better)
}
func TestEqualFork(t *testing.T) {
db, err := ethdb.NewMemDatabase()
// Tests that given a starting canonical chain of a given size, creating equal
// forks do take canonical ownership.
func TestEqualForkHeaders(t *testing.T) { testEqualFork(t, false) }
func TestEqualForkBlocks(t *testing.T) { testEqualFork(t, true) }
func testEqualFork(t *testing.T, full bool) {
length := 10
// Make first chain starting from genesis
_, processor, err := newCanonical(length, full)
if err != nil {
t.Fatal("Failed to create db:", err)
t.Fatalf("failed to make new canonical chain: %v", err)
}
bman, err := newCanonical(10, db)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
f := func(td1, td2 *big.Int) {
// Define the difficulty comparator
equal := func(td1, td2 *big.Int) {
if td2.Cmp(td1) != 0 {
t.Error("expected chainB to have equal difficulty. Got", td2, "expected ", td1)
t.Errorf("total difficulty mismatch: have %v, want %v", td2, td1)
}
}
// Sum of numbers must be equal to 10
// for this to be an equal fork
testFork(t, bman, 0, 10, f)
testFork(t, bman, 1, 9, f)
testFork(t, bman, 2, 8, f)
testFork(t, bman, 5, 5, f)
testFork(t, bman, 6, 4, f)
testFork(t, bman, 9, 1, f)
// Sum of numbers must be equal to `length` for this to be an equal fork
testFork(t, processor, 0, 10, full, equal)
testFork(t, processor, 1, 9, full, equal)
testFork(t, processor, 2, 8, full, equal)
testFork(t, processor, 5, 5, full, equal)
testFork(t, processor, 6, 4, full, equal)
testFork(t, processor, 9, 1, full, equal)
}
func TestBrokenChain(t *testing.T) {
db, err := ethdb.NewMemDatabase()
// Tests that chains missing links do not get accepted by the processor.
func TestBrokenHeaderChain(t *testing.T) { testBrokenChain(t, false) }
func TestBrokenBlockChain(t *testing.T) { testBrokenChain(t, true) }
func testBrokenChain(t *testing.T, full bool) {
// Make chain starting from genesis
db, processor, err := newCanonical(10, full)
if err != nil {
t.Fatal("Failed to create db:", err)
t.Fatalf("failed to make new canonical chain: %v", err)
}
bman, err := newCanonical(10, db)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
db2, err := ethdb.NewMemDatabase()
if err != nil {
t.Fatal("Failed to create db:", err)
}
bman2, err := newCanonical(10, db2)
if err != nil {
t.Fatal("Could not make new canonical chain:", err)
}
bman2.bc.SetProcessor(bman2)
parent := bman2.bc.CurrentBlock()
chainB := makeChain(parent, 5, db2, forkSeed)
chainB = chainB[1:]
_, err = testChain(chainB, bman)
if err == nil {
t.Error("expected broken chain to return error")
// Create a forked chain, and try to insert with a missing link
if full {
chain := makeBlockChain(processor.bc.CurrentBlock(), 5, db, forkSeed)[1:]
if err := testBlockChainImport(chain, processor); err == nil {
t.Errorf("broken block chain not reported")
}
} else {
chain := makeHeaderChain(processor.bc.CurrentHeader(), 5, db, forkSeed)[1:]
if err := testHeaderChainImport(chain, processor); err == nil {
t.Errorf("broken header chain not reported")
}
}
}
@ -374,15 +415,29 @@ func TestChainMultipleInsertions(t *testing.T) {
type bproc struct{}
func (bproc) Process(*types.Block) (vm.Logs, types.Receipts, error) { return nil, nil, nil }
func (bproc) Process(*types.Block) (vm.Logs, types.Receipts, error) { return nil, nil, nil }
func (bproc) ValidateHeader(*types.Header, bool, bool) error { return nil }
func (bproc) ValidateHeaderWithParent(*types.Header, *types.Header, bool, bool) error { return nil }
func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block {
func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Header {
blocks := makeBlockChainWithDiff(genesis, d, seed)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
return headers
}
func makeBlockChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block {
var chain []*types.Block
for i, difficulty := range d {
header := &types.Header{
Coinbase: common.Address{seed},
Number: big.NewInt(int64(i + 1)),
Difficulty: big.NewInt(int64(difficulty)),
Coinbase: common.Address{seed},
Number: big.NewInt(int64(i + 1)),
Difficulty: big.NewInt(int64(difficulty)),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyRootHash,
ReceiptHash: types.EmptyRootHash,
}
if i == 0 {
header.ParentHash = genesis.Hash()
@ -397,7 +452,7 @@ func makeChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.Block
func chm(genesis *types.Block, db ethdb.Database) *BlockChain {
var eventMux event.TypeMux
bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}}
bc := &BlockChain{chainDb: db, genesisBlock: genesis, eventMux: &eventMux, pow: FakePow{}, rand: rand.New(rand.NewSource(0))}
bc.headerCache, _ = lru.New(100)
bc.bodyCache, _ = lru.New(100)
bc.bodyRLPCache, _ = lru.New(100)
@ -410,147 +465,381 @@ func chm(genesis *types.Block, db ethdb.Database) *BlockChain {
return bc
}
func TestReorgLongest(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
// Tests that reorganizing a long difficult chain after a short easy one
// overwrites the canonical numbers and links in the database.
func TestReorgLongHeaders(t *testing.T) { testReorgLong(t, false) }
func TestReorgLongBlocks(t *testing.T) { testReorgLong(t, true) }
genesis, err := WriteTestNetGenesisBlock(db, 0)
if err != nil {
t.Error(err)
t.FailNow()
}
func testReorgLong(t *testing.T, full bool) {
testReorg(t, []int{1, 2, 4}, []int{1, 2, 3, 4}, 10, full)
}
// Tests that reorganizing a short difficult chain after a long easy one
// overwrites the canonical numbers and links in the database.
func TestReorgShortHeaders(t *testing.T) { testReorgShort(t, false) }
func TestReorgShortBlocks(t *testing.T) { testReorgShort(t, true) }
func testReorgShort(t *testing.T, full bool) {
testReorg(t, []int{1, 2, 3, 4}, []int{1, 10}, 11, full)
}
func testReorg(t *testing.T, first, second []int, td int64, full bool) {
// Create a pristine block chain
db, _ := ethdb.NewMemDatabase()
genesis, _ := WriteTestNetGenesisBlock(db, 0)
bc := chm(genesis, db)
chain1 := makeChainWithDiff(genesis, []int{1, 2, 4}, 10)
chain2 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11)
bc.InsertChain(chain1)
bc.InsertChain(chain2)
prev := bc.CurrentBlock()
for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) {
if prev.ParentHash() != block.Hash() {
t.Errorf("parent hash mismatch %x - %x", prev.ParentHash(), block.Hash())
// Insert an easy and a difficult chain afterwards
if full {
bc.InsertChain(makeBlockChainWithDiff(genesis, first, 11))
bc.InsertChain(makeBlockChainWithDiff(genesis, second, 22))
} else {
bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, first, 11), 1)
bc.InsertHeaderChain(makeHeaderChainWithDiff(genesis, second, 22), 1)
}
// Check that the chain is valid number and link wise
if full {
prev := bc.CurrentBlock()
for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) {
if prev.ParentHash() != block.Hash() {
t.Errorf("parent block hash mismatch: have %x, want %x", prev.ParentHash(), block.Hash())
}
}
} else {
prev := bc.CurrentHeader()
for header := bc.GetHeaderByNumber(bc.CurrentHeader().Number.Uint64() - 1); header.Number.Uint64() != 0; prev, header = header, bc.GetHeaderByNumber(header.Number.Uint64()-1) {
if prev.ParentHash != header.Hash() {
t.Errorf("parent header hash mismatch: have %x, want %x", prev.ParentHash, header.Hash())
}
}
}
// Make sure the chain total difficulty is the correct one
want := new(big.Int).Add(genesis.Difficulty(), big.NewInt(td))
if full {
if have := bc.GetTd(bc.CurrentBlock().Hash()); have.Cmp(want) != 0 {
t.Errorf("total difficulty mismatch: have %v, want %v", have, want)
}
} else {
if have := bc.GetTd(bc.CurrentHeader().Hash()); have.Cmp(want) != 0 {
t.Errorf("total difficulty mismatch: have %v, want %v", have, want)
}
}
}
func TestBadHashes(t *testing.T) {
// Tests that the insertion functions detect banned hashes.
func TestBadHeaderHashes(t *testing.T) { testBadHashes(t, false) }
func TestBadBlockHashes(t *testing.T) { testBadHashes(t, true) }
func testBadHashes(t *testing.T, full bool) {
// Create a pristine block chain
db, _ := ethdb.NewMemDatabase()
genesis, err := WriteTestNetGenesisBlock(db, 0)
if err != nil {
t.Error(err)
t.FailNow()
}
genesis, _ := WriteTestNetGenesisBlock(db, 0)
bc := chm(genesis, db)
chain := makeChainWithDiff(genesis, []int{1, 2, 4}, 10)
BadHashes[chain[2].Header().Hash()] = true
_, err = bc.InsertChain(chain)
// Create a chain, ban a hash and try to import
var err error
if full {
blocks := makeBlockChainWithDiff(genesis, []int{1, 2, 4}, 10)
BadHashes[blocks[2].Header().Hash()] = true
_, err = bc.InsertChain(blocks)
} else {
headers := makeHeaderChainWithDiff(genesis, []int{1, 2, 4}, 10)
BadHashes[headers[2].Hash()] = true
_, err = bc.InsertHeaderChain(headers, 1)
}
if !IsBadHashError(err) {
t.Errorf("error mismatch: want: BadHashError, have: %v", err)
}
}
func TestReorgBadHashes(t *testing.T) {
// Tests that bad hashes are detected on boot, and the chan rolled back to a
// good state prior to the bad hash.
func TestReorgBadHeaderHashes(t *testing.T) { testReorgBadHashes(t, false) }
func TestReorgBadBlockHashes(t *testing.T) { testReorgBadHashes(t, true) }
func testReorgBadHashes(t *testing.T, full bool) {
// Create a pristine block chain
db, _ := ethdb.NewMemDatabase()
genesis, err := WriteTestNetGenesisBlock(db, 0)
if err != nil {
t.Error(err)
t.FailNow()
}
genesis, _ := WriteTestNetGenesisBlock(db, 0)
bc := chm(genesis, db)
chain := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 11)
bc.InsertChain(chain)
// Create a chain, import and ban aferwards
headers := makeHeaderChainWithDiff(genesis, []int{1, 2, 3, 4}, 10)
blocks := makeBlockChainWithDiff(genesis, []int{1, 2, 3, 4}, 10)
if chain[3].Header().Hash() != bc.LastBlockHash() {
t.Errorf("last block hash mismatch: want: %x, have: %x", chain[3].Header().Hash(), bc.LastBlockHash())
if full {
if _, err := bc.InsertChain(blocks); err != nil {
t.Fatalf("failed to import blocks: %v", err)
}
if bc.CurrentBlock().Hash() != blocks[3].Hash() {
t.Errorf("last block hash mismatch: have: %x, want %x", bc.CurrentBlock().Hash(), blocks[3].Header().Hash())
}
BadHashes[blocks[3].Header().Hash()] = true
defer func() { delete(BadHashes, blocks[3].Header().Hash()) }()
} else {
if _, err := bc.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to import headers: %v", err)
}
if bc.CurrentHeader().Hash() != headers[3].Hash() {
t.Errorf("last header hash mismatch: have: %x, want %x", bc.CurrentHeader().Hash(), headers[3].Hash())
}
BadHashes[headers[3].Hash()] = true
defer func() { delete(BadHashes, headers[3].Hash()) }()
}
// NewChainManager should check BadHashes when loading it db
BadHashes[chain[3].Header().Hash()] = true
var eventMux event.TypeMux
ncm, err := NewBlockChain(db, FakePow{}, &eventMux)
// Create a new chain manager and check it rolled back the state
ncm, err := NewBlockChain(db, FakePow{}, new(event.TypeMux))
if err != nil {
t.Errorf("NewChainManager err: %s", err)
t.Fatalf("failed to create new chain manager: %v", err)
}
// check it set head to (valid) parent of bad hash block
if chain[2].Header().Hash() != ncm.LastBlockHash() {
t.Errorf("last block hash mismatch: want: %x, have: %x", chain[2].Header().Hash(), ncm.LastBlockHash())
}
if chain[2].Header().GasLimit.Cmp(ncm.GasLimit()) != 0 {
t.Errorf("current block gasLimit mismatch: want: %x, have: %x", chain[2].Header().GasLimit, ncm.GasLimit())
}
}
func TestReorgShortest(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
genesis, err := WriteTestNetGenesisBlock(db, 0)
if err != nil {
t.Error(err)
t.FailNow()
}
bc := chm(genesis, db)
chain1 := makeChainWithDiff(genesis, []int{1, 2, 3, 4}, 10)
chain2 := makeChainWithDiff(genesis, []int{1, 10}, 11)
bc.InsertChain(chain1)
bc.InsertChain(chain2)
prev := bc.CurrentBlock()
for block := bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1); block.NumberU64() != 0; prev, block = block, bc.GetBlockByNumber(block.NumberU64()-1) {
if prev.ParentHash() != block.Hash() {
t.Errorf("parent hash mismatch %x - %x", prev.ParentHash(), block.Hash())
if full {
if ncm.CurrentBlock().Hash() != blocks[2].Header().Hash() {
t.Errorf("last block hash mismatch: have: %x, want %x", ncm.CurrentBlock().Hash(), blocks[2].Header().Hash())
}
if blocks[2].Header().GasLimit.Cmp(ncm.GasLimit()) != 0 {
t.Errorf("last block gasLimit mismatch: have: %x, want %x", ncm.GasLimit(), blocks[2].Header().GasLimit)
}
} else {
if ncm.CurrentHeader().Hash() != headers[2].Hash() {
t.Errorf("last header hash mismatch: have: %x, want %x", ncm.CurrentHeader().Hash(), headers[2].Hash())
}
}
}
func TestInsertNonceError(t *testing.T) {
// Tests chain insertions in the face of one entity containing an invalid nonce.
func TestHeadersInsertNonceError(t *testing.T) { testInsertNonceError(t, false) }
func TestBlocksInsertNonceError(t *testing.T) { testInsertNonceError(t, true) }
func testInsertNonceError(t *testing.T, full bool) {
for i := 1; i < 25 && !t.Failed(); i++ {
db, _ := ethdb.NewMemDatabase()
genesis, err := WriteTestNetGenesisBlock(db, 0)
// Create a pristine chain and database
db, processor, err := newCanonical(0, full)
if err != nil {
t.Error(err)
t.FailNow()
t.Fatalf("failed to create pristine chain: %v", err)
}
bc := chm(genesis, db)
bc.processor = NewBlockProcessor(db, bc.pow, bc, bc.eventMux)
blocks := makeChain(bc.currentBlock, i, db, 0)
bc := processor.bc
fail := rand.Int() % len(blocks)
failblock := blocks[fail]
bc.pow = failPow{failblock.NumberU64()}
n, err := bc.InsertChain(blocks)
// Create and insert a chain with a failing nonce
var (
failAt int
failRes int
failNum uint64
failHash common.Hash
)
if full {
blocks := makeBlockChain(processor.bc.CurrentBlock(), i, db, 0)
failAt = rand.Int() % len(blocks)
failNum = blocks[failAt].NumberU64()
failHash = blocks[failAt].Hash()
processor.bc.pow = failPow{failNum}
processor.Pow = failPow{failNum}
failRes, err = processor.bc.InsertChain(blocks)
} else {
headers := makeHeaderChain(processor.bc.CurrentHeader(), i, db, 0)
failAt = rand.Int() % len(headers)
failNum = headers[failAt].Number.Uint64()
failHash = headers[failAt].Hash()
processor.bc.pow = failPow{failNum}
processor.Pow = failPow{failNum}
failRes, err = processor.bc.InsertHeaderChain(headers, 1)
}
// Check that the returned error indicates the nonce failure.
if n != fail {
t.Errorf("(i=%d) wrong failed block index: got %d, want %d", i, n, fail)
if failRes != failAt {
t.Errorf("test %d: failure index mismatch: have %d, want %d", i, failRes, failAt)
}
if !IsBlockNonceErr(err) {
t.Fatalf("(i=%d) got %q, want a nonce error", i, err)
t.Fatalf("test %d: error mismatch: have %v, want nonce error", i, err)
}
nerr := err.(*BlockNonceErr)
if nerr.Number.Cmp(failblock.Number()) != 0 {
t.Errorf("(i=%d) wrong block number in error, got %v, want %v", i, nerr.Number, failblock.Number())
if nerr.Number.Uint64() != failNum {
t.Errorf("test %d: number mismatch: have %v, want %v", i, nerr.Number, failNum)
}
if nerr.Hash != failblock.Hash() {
t.Errorf("(i=%d) wrong block hash in error, got %v, want %v", i, nerr.Hash, failblock.Hash())
if nerr.Hash != failHash {
t.Errorf("test %d: hash mismatch: have %x, want %x", i, nerr.Hash[:4], failHash[:4])
}
// Check that all no blocks after the failing block have been inserted.
for _, block := range blocks[fail:] {
if bc.HasBlock(block.Hash()) {
t.Errorf("(i=%d) invalid block %d present in chain", i, block.NumberU64())
for j := 0; j < i-failAt; j++ {
if full {
if block := bc.GetBlockByNumber(failNum + uint64(j)); block != nil {
t.Errorf("test %d: invalid block in chain: %v", i, block)
}
} else {
if header := bc.GetHeaderByNumber(failNum + uint64(j)); header != nil {
t.Errorf("test %d: invalid header in chain: %v", i, header)
}
}
}
}
}
// Tests that fast importing a block chain produces the same chain data as the
// classical full block processing.
func TestFastVsFullChains(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb, _ = ethdb.NewMemDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
genesis = GenesisBlockForTesting(gendb, address, funds)
)
blocks, receipts := GenerateChain(genesis, gendb, 1024, func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{0x00})
// If the block number is multiple of 3, send a few bonus transactions to the miner
if i%3 == 2 {
for j := 0; j < i%4+1; j++ {
tx, err := types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
}
// If the block number is a multiple of 5, add a few bonus uncles to the block
if i%5 == 5 {
block.AddUncle(&types.Header{ParentHash: block.PrevBlock(i - 1).Hash(), Number: big.NewInt(int64(i - 1))})
}
})
// Import the chain as an archive node for the comparison baseline
archiveDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
}
// Fast import the chain as a non-archive node to test
fastDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux)))
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
if ftd, atd := fast.GetTd(hash), archive.GetTd(hash); ftd.Cmp(atd) != 0 {
t.Errorf("block #%d [%x]: td mismatch: have %v, want %v", num, hash, ftd, atd)
}
if fheader, aheader := fast.GetHeader(hash), archive.GetHeader(hash); fheader.Hash() != aheader.Hash() {
t.Errorf("block #%d [%x]: header mismatch: have %v, want %v", num, hash, fheader, aheader)
}
if fblock, ablock := fast.GetBlock(hash), archive.GetBlock(hash); fblock.Hash() != ablock.Hash() {
t.Errorf("block #%d [%x]: block mismatch: have %v, want %v", num, hash, fblock, ablock)
} else if types.DeriveSha(fblock.Transactions()) != types.DeriveSha(ablock.Transactions()) {
t.Errorf("block #%d [%x]: transactions mismatch: have %v, want %v", num, hash, fblock.Transactions(), ablock.Transactions())
} else if types.CalcUncleHash(fblock.Uncles()) != types.CalcUncleHash(ablock.Uncles()) {
t.Errorf("block #%d [%x]: uncles mismatch: have %v, want %v", num, hash, fblock.Uncles(), ablock.Uncles())
}
if freceipts, areceipts := GetBlockReceipts(fastDb, hash), GetBlockReceipts(archiveDb, hash); types.DeriveSha(freceipts) != types.DeriveSha(areceipts) {
t.Errorf("block #%d [%x]: receipts mismatch: have %v, want %v", num, hash, freceipts, areceipts)
}
}
// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := GetCanonicalHash(fastDb, uint64(i)), GetCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
t.Errorf("block #%d: canonical hash mismatch: have %v, want %v", i, fhash, ahash)
}
}
}
// Tests that various import methods move the chain head pointers to the correct
// positions.
func TestLightVsFastVsFullChainHeads(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb, _ = ethdb.NewMemDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
genesis = GenesisBlockForTesting(gendb, address, funds)
)
height := uint64(1024)
blocks, receipts := GenerateChain(genesis, gendb, int(height), nil)
// Configure a subchain to roll back
remove := []common.Hash{}
for _, block := range blocks[height/2:] {
remove = append(remove, block.Hash())
}
// Create a small assertion method to check the three heads
assert := func(t *testing.T, kind string, chain *BlockChain, header uint64, fast uint64, block uint64) {
if num := chain.CurrentBlock().NumberU64(); num != block {
t.Errorf("%s head block mismatch: have #%v, want #%v", kind, num, block)
}
if num := chain.CurrentFastBlock().NumberU64(); num != fast {
t.Errorf("%s head fast-block mismatch: have #%v, want #%v", kind, num, fast)
}
if num := chain.CurrentHeader().Number.Uint64(); num != header {
t.Errorf("%s head header mismatch: have #%v, want #%v", kind, num, header)
}
}
// Import the chain as an archive node and ensure all pointers are updated
archiveDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(archiveDb, GenesisAccount{address, funds})
archive, _ := NewBlockChain(archiveDb, FakePow{}, new(event.TypeMux))
archive.SetProcessor(NewBlockProcessor(archiveDb, FakePow{}, archive, new(event.TypeMux)))
if n, err := archive.InsertChain(blocks); err != nil {
t.Fatalf("failed to process block %d: %v", n, err)
}
assert(t, "archive", archive, height, height, height)
archive.Rollback(remove)
assert(t, "archive", archive, height/2, height/2, height/2)
// Import the chain as a non-archive node and ensure all pointers are updated
fastDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(fastDb, GenesisAccount{address, funds})
fast, _ := NewBlockChain(fastDb, FakePow{}, new(event.TypeMux))
fast.SetProcessor(NewBlockProcessor(fastDb, FakePow{}, fast, new(event.TypeMux)))
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := fast.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
assert(t, "fast", fast, height, height, 0)
fast.Rollback(remove)
assert(t, "fast", fast, height/2, height/2, 0)
// Import the chain as a light node and ensure all pointers are updated
lightDb, _ := ethdb.NewMemDatabase()
WriteGenesisBlockForTesting(lightDb, GenesisAccount{address, funds})
light, _ := NewBlockChain(lightDb, FakePow{}, new(event.TypeMux))
light.SetProcessor(NewBlockProcessor(lightDb, FakePow{}, light, new(event.TypeMux)))
if n, err := light.InsertHeaderChain(headers, 1); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
assert(t, "light", light, height, 0, 0)
light.Rollback(remove)
assert(t, "light", light, height/2, 0, 0)
}
// Tests that chain reorganizations handle transaction removals and reinsertions.
func TestChainTxReorgs(t *testing.T) {
params.MinGasLimit = big.NewInt(125000) // Minimum the gas limit may ever be.
@ -587,7 +876,7 @@ func TestChainTxReorgs(t *testing.T) {
// - futureAdd: transaction added after the reorg has already finished
var pastAdd, freshAdd, futureAdd *types.Transaction
chain := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
chain, _ := GenerateChain(genesis, db, 3, func(i int, gen *BlockGen) {
switch i {
case 0:
pastDrop, _ = types.NewTransaction(gen.TxNonce(addr2), addr2, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key2)
@ -613,7 +902,7 @@ func TestChainTxReorgs(t *testing.T) {
}
// overwrite the old chain
chain = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
chain, _ = GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
switch i {
case 0:
pastAdd, _ = types.NewTransaction(gen.TxNonce(addr3), addr3, big.NewInt(1000), params.TxGas, nil, nil).SignECDSA(key3)

View File

@ -98,7 +98,7 @@ func (b *BlockGen) AddTx(tx *types.Transaction) {
b.header.GasUsed.Add(b.header.GasUsed, gas)
receipt := types.NewReceipt(root.Bytes(), b.header.GasUsed)
logs := b.statedb.GetLogs(tx.Hash())
receipt.SetLogs(logs)
receipt.Logs = logs
receipt.Bloom = types.CreateBloom(types.Receipts{receipt})
b.txs = append(b.txs, tx)
b.receipts = append(b.receipts, receipt)
@ -163,13 +163,13 @@ func (b *BlockGen) OffsetTime(seconds int64) {
// Blocks created by GenerateChain do not contain valid proof of work
// values. Inserting them into BlockChain requires use of FakePow or
// a similar non-validating proof of work implementation.
func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int, *BlockGen)) []*types.Block {
func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int, *BlockGen)) ([]*types.Block, []types.Receipts) {
statedb, err := state.New(parent.Root(), db)
if err != nil {
panic(err)
}
blocks := make(types.Blocks, n)
genblock := func(i int, h *types.Header) *types.Block {
blocks, receipts := make(types.Blocks, n), make([]types.Receipts, n)
genblock := func(i int, h *types.Header) (*types.Block, types.Receipts) {
b := &BlockGen{parent: parent, i: i, chain: blocks, header: h, statedb: statedb}
if gen != nil {
gen(i, b)
@ -180,15 +180,16 @@ func GenerateChain(parent *types.Block, db ethdb.Database, n int, gen func(int,
panic(fmt.Sprintf("state write error: %v", err))
}
h.Root = root
return types.NewBlock(h, b.txs, b.uncles, b.receipts)
return types.NewBlock(h, b.txs, b.uncles, b.receipts), b.receipts
}
for i := 0; i < n; i++ {
header := makeHeader(parent, statedb)
block := genblock(i, header)
block, receipt := genblock(i, header)
blocks[i] = block
receipts[i] = receipt
parent = block
}
return blocks
return blocks, receipts
}
func makeHeader(parent *types.Block, state *state.StateDB) *types.Header {
@ -210,26 +211,51 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header {
}
}
// newCanonical creates a new deterministic canonical chain by running
// InsertChain on the result of makeChain.
func newCanonical(n int, db ethdb.Database) (*BlockProcessor, error) {
// newCanonical creates a chain database, and injects a deterministic canonical
// chain. Depending on the full flag, if creates either a full block chain or a
// header only chain.
func newCanonical(n int, full bool) (ethdb.Database, *BlockProcessor, error) {
// Create te new chain database
db, _ := ethdb.NewMemDatabase()
evmux := &event.TypeMux{}
WriteTestNetGenesisBlock(db, 0)
chainman, _ := NewBlockChain(db, FakePow{}, evmux)
bman := NewBlockProcessor(db, FakePow{}, chainman, evmux)
bman.bc.SetProcessor(bman)
parent := bman.bc.CurrentBlock()
// Initialize a fresh chain with only a genesis block
genesis, _ := WriteTestNetGenesisBlock(db, 0)
blockchain, _ := NewBlockChain(db, FakePow{}, evmux)
processor := NewBlockProcessor(db, FakePow{}, blockchain, evmux)
processor.bc.SetProcessor(processor)
// Create and inject the requested chain
if n == 0 {
return bman, nil
return db, processor, nil
}
lchain := makeChain(parent, n, db, canonicalSeed)
_, err := bman.bc.InsertChain(lchain)
return bman, err
if full {
// Full block-chain requested
blocks := makeBlockChain(genesis, n, db, canonicalSeed)
_, err := blockchain.InsertChain(blocks)
return db, processor, err
}
// Header-only chain requested
headers := makeHeaderChain(genesis.Header(), n, db, canonicalSeed)
_, err := blockchain.InsertHeaderChain(headers, 1)
return db, processor, err
}
func makeChain(parent *types.Block, n int, db ethdb.Database, seed int) []*types.Block {
return GenerateChain(parent, db, n, func(i int, b *BlockGen) {
// makeHeaderChain creates a deterministic chain of headers rooted at parent.
func makeHeaderChain(parent *types.Header, n int, db ethdb.Database, seed int) []*types.Header {
blocks := makeBlockChain(types.NewBlockWithHeader(parent), n, db, seed)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
return headers
}
// makeBlockChain creates a deterministic chain of blocks rooted at parent.
func makeBlockChain(parent *types.Block, n int, db ethdb.Database, seed int) []*types.Block {
blocks, _ := GenerateChain(parent, db, n, func(i int, b *BlockGen) {
b.SetCoinbase(common.Address{0: byte(seed), 19: byte(i)})
})
return blocks
}

View File

@ -47,7 +47,7 @@ func ExampleGenerateChain() {
// This call generates a chain of 5 blocks. The function runs for
// each block and adds different features to gen based on the
// block index.
chain := GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
chain, _ := GenerateChain(genesis, db, 5, func(i int, gen *BlockGen) {
switch i {
case 0:
// In block 1, addr1 sends addr2 some ether.

View File

@ -60,7 +60,7 @@ func TestPowVerification(t *testing.T) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
blocks = GenerateChain(genesis, testdb, 8, nil)
blocks, _ = GenerateChain(genesis, testdb, 8, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@ -115,7 +115,7 @@ func testPowConcurrentVerification(t *testing.T, threads int) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
blocks = GenerateChain(genesis, testdb, 8, nil)
blocks, _ = GenerateChain(genesis, testdb, 8, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
@ -186,7 +186,7 @@ func testPowConcurrentAbortion(t *testing.T, threads int) {
var (
testdb, _ = ethdb.NewMemDatabase()
genesis = GenesisBlockForTesting(testdb, common.Address{}, new(big.Int))
blocks = GenerateChain(genesis, testdb, 1024, nil)
blocks, _ = GenerateChain(genesis, testdb, 1024, nil)
)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {

View File

@ -34,6 +34,7 @@ import (
var (
headHeaderKey = []byte("LastHeader")
headBlockKey = []byte("LastBlock")
headFastKey = []byte("LastFast")
blockPrefix = []byte("block-")
blockNumPrefix = []byte("block-num-")
@ -129,7 +130,7 @@ func GetCanonicalHash(db ethdb.Database, number uint64) common.Hash {
// header. The difference between this and GetHeadBlockHash is that whereas the
// last block hash is only updated upon a full block import, the last header
// hash is updated already at header import, allowing head tracking for the
// fast synchronization mechanism.
// light synchronization mechanism.
func GetHeadHeaderHash(db ethdb.Database) common.Hash {
data, _ := db.Get(headHeaderKey)
if len(data) == 0 {
@ -147,6 +148,18 @@ func GetHeadBlockHash(db ethdb.Database) common.Hash {
return common.BytesToHash(data)
}
// GetHeadFastBlockHash retrieves the hash of the current canonical head block during
// fast synchronization. The difference between this and GetHeadBlockHash is that
// whereas the last block hash is only updated upon a full block import, the last
// fast hash is updated when importing pre-processed blocks.
func GetHeadFastBlockHash(db ethdb.Database) common.Hash {
data, _ := db.Get(headFastKey)
if len(data) == 0 {
return common.Hash{}
}
return common.BytesToHash(data)
}
// GetHeaderRLP retrieves a block header in its raw RLP database encoding, or nil
// if the header's not found.
func GetHeaderRLP(db ethdb.Database, hash common.Hash) rlp.RawValue {
@ -249,6 +262,15 @@ func WriteHeadBlockHash(db ethdb.Database, hash common.Hash) error {
return nil
}
// WriteHeadFastBlockHash stores the fast head block's hash.
func WriteHeadFastBlockHash(db ethdb.Database, hash common.Hash) error {
if err := db.Put(headFastKey, hash.Bytes()); err != nil {
glog.Fatalf("failed to store last fast block's hash into database: %v", err)
return err
}
return nil
}
// WriteHeader serializes a block header into the database.
func WriteHeader(db ethdb.Database, header *types.Header) error {
data, err := rlp.EncodeToBytes(header)
@ -372,7 +394,7 @@ func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts)
bloomDat, _ := db.Get(key)
bloom := types.BytesToBloom(bloomDat)
for _, receipt := range receipts {
for _, log := range receipt.Logs() {
for _, log := range receipt.Logs {
bloom.Add(log.Address.Big())
}
}

View File

@ -163,7 +163,12 @@ func TestBlockStorage(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
// Create a test block to move around the database and make sure it's really new
block := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block")})
block := types.NewBlockWithHeader(&types.Header{
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyRootHash,
ReceiptHash: types.EmptyRootHash,
})
if entry := GetBlock(db, block.Hash()); entry != nil {
t.Fatalf("Non existent block returned: %v", entry)
}
@ -208,8 +213,12 @@ func TestBlockStorage(t *testing.T) {
// Tests that partial block contents don't get reassembled into full blocks.
func TestPartialBlockStorage(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
block := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block")})
block := types.NewBlockWithHeader(&types.Header{
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyRootHash,
ReceiptHash: types.EmptyRootHash,
})
// Store a header and check that it's not recognized as a block
if err := WriteHeader(db, block.Header()); err != nil {
t.Fatalf("Failed to write header into database: %v", err)
@ -298,6 +307,7 @@ func TestHeadStorage(t *testing.T) {
blockHead := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block header")})
blockFull := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block full")})
blockFast := types.NewBlockWithHeader(&types.Header{Extra: []byte("test block fast")})
// Check that no head entries are in a pristine database
if entry := GetHeadHeaderHash(db); entry != (common.Hash{}) {
@ -306,6 +316,9 @@ func TestHeadStorage(t *testing.T) {
if entry := GetHeadBlockHash(db); entry != (common.Hash{}) {
t.Fatalf("Non head block entry returned: %v", entry)
}
if entry := GetHeadFastBlockHash(db); entry != (common.Hash{}) {
t.Fatalf("Non fast head block entry returned: %v", entry)
}
// Assign separate entries for the head header and block
if err := WriteHeadHeaderHash(db, blockHead.Hash()); err != nil {
t.Fatalf("Failed to write head header hash: %v", err)
@ -313,6 +326,9 @@ func TestHeadStorage(t *testing.T) {
if err := WriteHeadBlockHash(db, blockFull.Hash()); err != nil {
t.Fatalf("Failed to write head block hash: %v", err)
}
if err := WriteHeadFastBlockHash(db, blockFast.Hash()); err != nil {
t.Fatalf("Failed to write fast head block hash: %v", err)
}
// Check that both heads are present, and different (i.e. two heads maintained)
if entry := GetHeadHeaderHash(db); entry != blockHead.Hash() {
t.Fatalf("Head header hash mismatch: have %v, want %v", entry, blockHead.Hash())
@ -320,21 +336,24 @@ func TestHeadStorage(t *testing.T) {
if entry := GetHeadBlockHash(db); entry != blockFull.Hash() {
t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash())
}
if entry := GetHeadFastBlockHash(db); entry != blockFast.Hash() {
t.Fatalf("Fast head block hash mismatch: have %v, want %v", entry, blockFast.Hash())
}
}
func TestMipmapBloom(t *testing.T) {
db, _ := ethdb.NewMemDatabase()
receipt1 := new(types.Receipt)
receipt1.SetLogs(vm.Logs{
receipt1.Logs = vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
&vm.Log{Address: common.BytesToAddress([]byte("address"))},
})
}
receipt2 := new(types.Receipt)
receipt2.SetLogs(vm.Logs{
receipt2.Logs = vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
&vm.Log{Address: common.BytesToAddress([]byte("address1"))},
})
}
WriteMipmapBloom(db, 1, types.Receipts{receipt1})
WriteMipmapBloom(db, 2, types.Receipts{receipt2})
@ -349,15 +368,15 @@ func TestMipmapBloom(t *testing.T) {
// reset
db, _ = ethdb.NewMemDatabase()
receipt := new(types.Receipt)
receipt.SetLogs(vm.Logs{
receipt.Logs = vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
})
}
WriteMipmapBloom(db, 999, types.Receipts{receipt1})
receipt = new(types.Receipt)
receipt.SetLogs(vm.Logs{
receipt.Logs = vm.Logs{
&vm.Log{Address: common.BytesToAddress([]byte("test 1"))},
})
}
WriteMipmapBloom(db, 1000, types.Receipts{receipt})
bloom := GetMipmapBloom(db, 1000, 1000)
@ -384,22 +403,22 @@ func TestMipmapChain(t *testing.T) {
defer db.Close()
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)})
chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
chain, receipts := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
var receipts types.Receipts
switch i {
case 1:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.SetLogs(vm.Logs{
receipt.Logs = vm.Logs{
&vm.Log{
Address: addr,
Topics: []common.Hash{hash1},
},
})
}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
case 1000:
receipt := types.NewReceipt(nil, new(big.Int))
receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}})
receipt.Logs = vm.Logs{&vm.Log{Address: addr2}}
gen.AddUncheckedReceipt(receipt)
receipts = types.Receipts{receipt}
@ -412,7 +431,7 @@ func TestMipmapChain(t *testing.T) {
}
WriteMipmapBloom(db, uint64(i+1), receipts)
})
for _, block := range chain {
for i, block := range chain {
WriteBlock(db, block)
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
@ -420,7 +439,7 @@ func TestMipmapChain(t *testing.T) {
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
t.Fatalf("failed to insert block number: %v", err)
}
if err := PutBlockReceipts(db, block, block.Receipts()); err != nil {
if err := PutBlockReceipts(db, block.Hash(), receipts[i]); err != nil {
t.Fatal("error writing block receipts:", err)
}
}

View File

@ -111,7 +111,7 @@ type BlockNonceErr struct {
}
func (err *BlockNonceErr) Error() string {
return fmt.Sprintf("block %d (%v) nonce is invalid (got %d)", err.Number, err.Hash, err.Nonce)
return fmt.Sprintf("nonce for #%d [%x…] is invalid (got %d)", err.Number, err.Hash, err.Nonce)
}
// IsBlockNonceErr returns true for invalid block nonce errors.

View File

@ -103,7 +103,7 @@ func WriteGenesisBlock(chainDb ethdb.Database, reader io.Reader) (*types.Block,
if err := WriteBlock(chainDb, block); err != nil {
return nil, err
}
if err := PutBlockReceipts(chainDb, block, nil); err != nil {
if err := PutBlockReceipts(chainDb, block.Hash(), nil); err != nil {
return nil, err
}
if err := WriteCanonicalHash(chainDb, block.Hash(), block.NumberU64()); err != nil {

70
core/state/sync.go Normal file
View File

@ -0,0 +1,70 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"bytes"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
// StateSync is the main state synchronisation scheduler, which provides yet the
// unknown state hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the state database step by step until all is done.
type StateSync trie.TrieSync
// NewStateSync create a new state trie download scheduler.
func NewStateSync(root common.Hash, database ethdb.Database) *StateSync {
var syncer *trie.TrieSync
callback := func(leaf []byte, parent common.Hash) error {
var obj struct {
Nonce uint64
Balance *big.Int
Root common.Hash
CodeHash []byte
}
if err := rlp.Decode(bytes.NewReader(leaf), &obj); err != nil {
return err
}
syncer.AddSubTrie(obj.Root, 64, parent, nil)
syncer.AddRawEntry(common.BytesToHash(obj.CodeHash), 64, parent)
return nil
}
syncer = trie.NewTrieSync(root, database, callback)
return (*StateSync)(syncer)
}
// Missing retrieves the known missing nodes from the state trie for retrieval.
func (s *StateSync) Missing(max int) []common.Hash {
return (*trie.TrieSync)(s).Missing(max)
}
// Process injects a batch of retrieved trie nodes data.
func (s *StateSync) Process(list []trie.SyncResult) (int, error) {
return (*trie.TrieSync)(s).Process(list)
}
// Pending returns the number of state entries currently pending for download.
func (s *StateSync) Pending() int {
return (*trie.TrieSync)(s).Pending()
}

238
core/state/sync_test.go Normal file
View File

@ -0,0 +1,238 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"bytes"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/trie"
)
// testAccount is the data associated with an account used by the state tests.
type testAccount struct {
address common.Address
balance *big.Int
nonce uint64
code []byte
}
// makeTestState create a sample test state to test node-wise reconstruction.
func makeTestState() (ethdb.Database, common.Hash, []*testAccount) {
// Create an empty state
db, _ := ethdb.NewMemDatabase()
state, _ := New(common.Hash{}, db)
// Fill it with some arbitrary data
accounts := []*testAccount{}
for i := byte(0); i < 255; i++ {
obj := state.GetOrNewStateObject(common.BytesToAddress([]byte{i}))
acc := &testAccount{address: common.BytesToAddress([]byte{i})}
obj.AddBalance(big.NewInt(int64(11 * i)))
acc.balance = big.NewInt(int64(11 * i))
obj.SetNonce(uint64(42 * i))
acc.nonce = uint64(42 * i)
if i%3 == 0 {
obj.SetCode([]byte{i, i, i, i, i})
acc.code = []byte{i, i, i, i, i}
}
state.UpdateStateObject(obj)
accounts = append(accounts, acc)
}
root, _ := state.Commit()
// Return the generated state
return db, root, accounts
}
// checkStateAccounts cross references a reconstructed state with an expected
// account array.
func checkStateAccounts(t *testing.T, db ethdb.Database, root common.Hash, accounts []*testAccount) {
state, _ := New(root, db)
for i, acc := range accounts {
if balance := state.GetBalance(acc.address); balance.Cmp(acc.balance) != 0 {
t.Errorf("account %d: balance mismatch: have %v, want %v", i, balance, acc.balance)
}
if nonce := state.GetNonce(acc.address); nonce != acc.nonce {
t.Errorf("account %d: nonce mismatch: have %v, want %v", i, nonce, acc.nonce)
}
if code := state.GetCode(acc.address); bytes.Compare(code, acc.code) != 0 {
t.Errorf("account %d: code mismatch: have %x, want %x", i, code, acc.code)
}
}
}
// Tests that an empty state is not scheduled for syncing.
func TestEmptyStateSync(t *testing.T) {
empty := common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
db, _ := ethdb.NewMemDatabase()
if req := NewStateSync(empty, db).Missing(1); len(req) != 0 {
t.Errorf("content requested for empty state: %v", req)
}
}
// Tests that given a root hash, a state can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go.
func TestIterativeStateSyncIndividual(t *testing.T) { testIterativeStateSync(t, 1) }
func TestIterativeStateSyncBatched(t *testing.T) { testIterativeStateSync(t, 100) }
func testIterativeStateSync(t *testing.T, batch int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := append([]common.Hash{}, sched.Missing(batch)...)
for len(queue) > 0 {
results := make([]trie.SyncResult, len(queue))
for i, hash := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = trie.SyncResult{hash, data}
}
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[:0], sched.Missing(batch)...)
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned, and the others sent only later.
func TestIterativeDelayedStateSync(t *testing.T) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := append([]common.Hash{}, sched.Missing(0)...)
for len(queue) > 0 {
// Sync only half of the scheduled nodes
results := make([]trie.SyncResult, len(queue)/2+1)
for i, hash := range queue[:len(results)] {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results[i] = trie.SyncResult{hash, data}
}
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = append(queue[len(results):], sched.Missing(0)...)
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that given a root hash, a trie can sync iteratively on a single thread,
// requesting retrieval tasks and returning all of them in one go, however in a
// random order.
func TestIterativeRandomStateSyncIndividual(t *testing.T) { testIterativeRandomStateSync(t, 1) }
func TestIterativeRandomStateSyncBatched(t *testing.T) { testIterativeRandomStateSync(t, 100) }
func testIterativeRandomStateSync(t *testing.T, batch int) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Fetch all the queued nodes in a random order
results := make([]trie.SyncResult, 0, len(queue))
for hash, _ := range queue {
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, trie.SyncResult{hash, data})
}
// Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
queue = make(map[common.Hash]struct{})
for _, hash := range sched.Missing(batch) {
queue[hash] = struct{}{}
}
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}
// Tests that the trie scheduler can correctly reconstruct the state even if only
// partial results are returned (Even those randomly), others sent only later.
func TestIterativeRandomDelayedStateSync(t *testing.T) {
// Create a random state to copy
srcDb, srcRoot, srcAccounts := makeTestState()
// Create a destination state and sync with the scheduler
dstDb, _ := ethdb.NewMemDatabase()
sched := NewStateSync(srcRoot, dstDb)
queue := make(map[common.Hash]struct{})
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
for len(queue) > 0 {
// Sync only half of the scheduled nodes, even those in random order
results := make([]trie.SyncResult, 0, len(queue)/2+1)
for hash, _ := range queue {
delete(queue, hash)
data, err := srcDb.Get(hash.Bytes())
if err != nil {
t.Fatalf("failed to retrieve node data for %x: %v", hash, err)
}
results = append(results, trie.SyncResult{hash, data})
if len(results) >= cap(results) {
break
}
}
// Feed the retrieved results back and queue new tasks
if index, err := sched.Process(results); err != nil {
t.Fatalf("failed to process result #%d: %v", index, err)
}
for _, hash := range sched.Missing(0) {
queue[hash] = struct{}{}
}
}
// Cross check that the two states are in sync
checkStateAccounts(t, dstDb, srcRoot, srcAccounts)
}

View File

@ -140,11 +140,14 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
if len(data) == 0 {
return nil
}
var receipts types.Receipts
err := rlp.DecodeBytes(data, &receipts)
if err != nil {
glog.V(logger.Core).Infoln("GetReceiptse err", err)
rs := []*types.ReceiptForStorage{}
if err := rlp.DecodeBytes(data, &rs); err != nil {
glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
return nil
}
receipts := make(types.Receipts, len(rs))
for i, receipt := range rs {
receipts[i] = (*types.Receipt)(receipt)
}
return receipts
}
@ -152,7 +155,7 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
// PutBlockReceipts stores the block's transactions associated receipts
// and stores them by block hash in a single slice. This is required for
// forks and chain reorgs
func PutBlockReceipts(db ethdb.Database, block *types.Block, receipts types.Receipts) error {
func PutBlockReceipts(db ethdb.Database, hash common.Hash, receipts types.Receipts) error {
rs := make([]*types.ReceiptForStorage, len(receipts))
for i, receipt := range receipts {
rs[i] = (*types.ReceiptForStorage)(receipt)
@ -161,12 +164,9 @@ func PutBlockReceipts(db ethdb.Database, block *types.Block, receipts types.Rece
if err != nil {
return err
}
hash := block.Hash()
err = db.Put(append(blockReceiptsPre, hash[:]...), bytes)
if err != nil {
return err
}
return nil
}

View File

@ -128,7 +128,6 @@ type Block struct {
header *Header
uncles []*Header
transactions Transactions
receipts Receipts
// caches
hash atomic.Value
@ -172,8 +171,8 @@ type storageblock struct {
}
var (
emptyRootHash = DeriveSha(Transactions{})
emptyUncleHash = CalcUncleHash(nil)
EmptyRootHash = DeriveSha(Transactions{})
EmptyUncleHash = CalcUncleHash(nil)
)
// NewBlock creates a new block. The input data is copied,
@ -184,11 +183,11 @@ var (
// are ignored and set to values derived from the given txs, uncles
// and receipts.
func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*Receipt) *Block {
b := &Block{header: copyHeader(header), td: new(big.Int)}
b := &Block{header: CopyHeader(header), td: new(big.Int)}
// TODO: panic if len(txs) != len(receipts)
if len(txs) == 0 {
b.header.TxHash = emptyRootHash
b.header.TxHash = EmptyRootHash
} else {
b.header.TxHash = DeriveSha(Transactions(txs))
b.transactions = make(Transactions, len(txs))
@ -196,21 +195,19 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
}
if len(receipts) == 0 {
b.header.ReceiptHash = emptyRootHash
b.header.ReceiptHash = EmptyRootHash
} else {
b.header.ReceiptHash = DeriveSha(Receipts(receipts))
b.header.Bloom = CreateBloom(receipts)
b.receipts = make([]*Receipt, len(receipts))
copy(b.receipts, receipts)
}
if len(uncles) == 0 {
b.header.UncleHash = emptyUncleHash
b.header.UncleHash = EmptyUncleHash
} else {
b.header.UncleHash = CalcUncleHash(uncles)
b.uncles = make([]*Header, len(uncles))
for i := range uncles {
b.uncles[i] = copyHeader(uncles[i])
b.uncles[i] = CopyHeader(uncles[i])
}
}
@ -221,10 +218,12 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
// header data is copied, changes to header and to the field values
// will not affect the block.
func NewBlockWithHeader(header *Header) *Block {
return &Block{header: copyHeader(header)}
return &Block{header: CopyHeader(header)}
}
func copyHeader(h *Header) *Header {
// CopyHeader creates a deep copy of a block header to prevent side effects from
// modifying a header variable.
func CopyHeader(h *Header) *Header {
cpy := *h
if cpy.Time = new(big.Int); h.Time != nil {
cpy.Time.Set(h.Time)
@ -297,7 +296,6 @@ func (b *StorageBlock) DecodeRLP(s *rlp.Stream) error {
// TODO: copies
func (b *Block) Uncles() []*Header { return b.uncles }
func (b *Block) Transactions() Transactions { return b.transactions }
func (b *Block) Receipts() Receipts { return b.receipts }
func (b *Block) Transaction(hash common.Hash) *Transaction {
for _, transaction := range b.transactions {
@ -326,7 +324,7 @@ func (b *Block) ReceiptHash() common.Hash { return b.header.ReceiptHash }
func (b *Block) UncleHash() common.Hash { return b.header.UncleHash }
func (b *Block) Extra() []byte { return common.CopyBytes(b.header.Extra) }
func (b *Block) Header() *Header { return copyHeader(b.header) }
func (b *Block) Header() *Header { return CopyHeader(b.header) }
func (b *Block) HashNoNonce() common.Hash {
return b.header.HashNoNonce()
@ -362,7 +360,6 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block {
return &Block{
header: &cpy,
transactions: b.transactions,
receipts: b.receipts,
uncles: b.uncles,
}
}
@ -370,13 +367,13 @@ func (b *Block) WithMiningResult(nonce uint64, mixDigest common.Hash) *Block {
// WithBody returns a new block with the given transaction and uncle contents.
func (b *Block) WithBody(transactions []*Transaction, uncles []*Header) *Block {
block := &Block{
header: copyHeader(b.header),
header: CopyHeader(b.header),
transactions: make([]*Transaction, len(transactions)),
uncles: make([]*Header, len(uncles)),
}
copy(block.transactions, transactions)
for i := range uncles {
block.uncles[i] = copyHeader(uncles[i])
block.uncles[i] = CopyHeader(uncles[i])
}
return block
}

View File

@ -72,7 +72,7 @@ func (b Bloom) TestBytes(test []byte) bool {
func CreateBloom(receipts Receipts) Bloom {
bin := new(big.Int)
for _, receipt := range receipts {
bin.Or(bin, LogsBloom(receipt.logs))
bin.Or(bin, LogsBloom(receipt.Logs))
}
return BytesToBloom(bin.Bytes())

View File

@ -20,4 +20,6 @@ import "github.com/ethereum/go-ethereum/core/vm"
type BlockProcessor interface {
Process(*Block) (vm.Logs, Receipts, error)
ValidateHeader(*Header, bool, bool) error
ValidateHeaderWithParent(*Header, *Header, bool, bool) error
}

View File

@ -17,7 +17,6 @@
package types
import (
"bytes"
"fmt"
"io"
"math/big"
@ -27,89 +26,116 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)
// Receipt represents the results of a transaction.
type Receipt struct {
// Consensus fields
PostState []byte
CumulativeGasUsed *big.Int
Bloom Bloom
TxHash common.Hash
ContractAddress common.Address
logs vm.Logs
GasUsed *big.Int
Logs vm.Logs
// Implementation fields
TxHash common.Hash
ContractAddress common.Address
GasUsed *big.Int
}
func NewReceipt(root []byte, cumalativeGasUsed *big.Int) *Receipt {
return &Receipt{PostState: common.CopyBytes(root), CumulativeGasUsed: new(big.Int).Set(cumalativeGasUsed)}
// NewReceipt creates a barebone transaction receipt, copying the init fields.
func NewReceipt(root []byte, cumulativeGasUsed *big.Int) *Receipt {
return &Receipt{PostState: common.CopyBytes(root), CumulativeGasUsed: new(big.Int).Set(cumulativeGasUsed)}
}
func (self *Receipt) SetLogs(logs vm.Logs) {
self.logs = logs
// EncodeRLP implements rlp.Encoder, and flattens the consensus fields of a receipt
// into an RLP stream.
func (r *Receipt) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{r.PostState, r.CumulativeGasUsed, r.Bloom, r.Logs})
}
func (self *Receipt) Logs() vm.Logs {
return self.logs
// DecodeRLP implements rlp.Decoder, and loads the consensus fields of a receipt
// from an RLP stream.
func (r *Receipt) DecodeRLP(s *rlp.Stream) error {
var receipt struct {
PostState []byte
CumulativeGasUsed *big.Int
Bloom Bloom
Logs vm.Logs
}
if err := s.Decode(&receipt); err != nil {
return err
}
r.PostState, r.CumulativeGasUsed, r.Bloom, r.Logs = receipt.PostState, receipt.CumulativeGasUsed, receipt.Bloom, receipt.Logs
return nil
}
func (self *Receipt) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{self.PostState, self.CumulativeGasUsed, self.Bloom, self.logs})
// RlpEncode implements common.RlpEncode required for SHA3 derivation.
func (r *Receipt) RlpEncode() []byte {
bytes, err := rlp.EncodeToBytes(r)
if err != nil {
panic(err)
}
return bytes
}
func (self *Receipt) DecodeRLP(s *rlp.Stream) error {
var r struct {
// String implements the Stringer interface.
func (r *Receipt) String() string {
return fmt.Sprintf("receipt{med=%x cgas=%v bloom=%x logs=%v}", r.PostState, r.CumulativeGasUsed, r.Bloom, r.Logs)
}
// ReceiptForStorage is a wrapper around a Receipt that flattens and parses the
// entire content of a receipt, as opposed to only the consensus fields originally.
type ReceiptForStorage Receipt
// EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt
// into an RLP stream.
func (r *ReceiptForStorage) EncodeRLP(w io.Writer) error {
logs := make([]*vm.LogForStorage, len(r.Logs))
for i, log := range r.Logs {
logs[i] = (*vm.LogForStorage)(log)
}
return rlp.Encode(w, []interface{}{r.PostState, r.CumulativeGasUsed, r.Bloom, r.TxHash, r.ContractAddress, logs, r.GasUsed})
}
// DecodeRLP implements rlp.Decoder, and loads both consensus and implementation
// fields of a receipt from an RLP stream.
func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error {
var receipt struct {
PostState []byte
CumulativeGasUsed *big.Int
Bloom Bloom
TxHash common.Hash
ContractAddress common.Address
Logs vm.Logs
Logs []*vm.LogForStorage
GasUsed *big.Int
}
if err := s.Decode(&r); err != nil {
if err := s.Decode(&receipt); err != nil {
return err
}
self.PostState, self.CumulativeGasUsed, self.Bloom, self.TxHash, self.ContractAddress, self.logs, self.GasUsed = r.PostState, r.CumulativeGasUsed, r.Bloom, r.TxHash, r.ContractAddress, r.Logs, r.GasUsed
// Assign the consensus fields
r.PostState, r.CumulativeGasUsed, r.Bloom = receipt.PostState, receipt.CumulativeGasUsed, receipt.Bloom
r.Logs = make(vm.Logs, len(receipt.Logs))
for i, log := range receipt.Logs {
r.Logs[i] = (*vm.Log)(log)
}
// Assign the implementation fields
r.TxHash, r.ContractAddress, r.GasUsed = receipt.TxHash, receipt.ContractAddress, receipt.GasUsed
return nil
}
type ReceiptForStorage Receipt
func (self *ReceiptForStorage) EncodeRLP(w io.Writer) error {
storageLogs := make([]*vm.LogForStorage, len(self.logs))
for i, log := range self.logs {
storageLogs[i] = (*vm.LogForStorage)(log)
}
return rlp.Encode(w, []interface{}{self.PostState, self.CumulativeGasUsed, self.Bloom, self.TxHash, self.ContractAddress, storageLogs, self.GasUsed})
}
func (self *Receipt) RlpEncode() []byte {
bytes, err := rlp.EncodeToBytes(self)
if err != nil {
fmt.Println("TMP -- RECEIPT ENCODE ERROR", err)
}
return bytes
}
func (self *Receipt) Cmp(other *Receipt) bool {
if bytes.Compare(self.PostState, other.PostState) != 0 {
return false
}
return true
}
func (self *Receipt) String() string {
return fmt.Sprintf("receipt{med=%x cgas=%v bloom=%x logs=%v}", self.PostState, self.CumulativeGasUsed, self.Bloom, self.logs)
}
// Receipts is a wrapper around a Receipt array to implement types.DerivableList.
type Receipts []*Receipt
func (self Receipts) RlpEncode() []byte {
bytes, err := rlp.EncodeToBytes(self)
// RlpEncode implements common.RlpEncode required for SHA3 derivation.
func (r Receipts) RlpEncode() []byte {
bytes, err := rlp.EncodeToBytes(r)
if err != nil {
fmt.Println("TMP -- RECEIPTS ENCODE ERROR", err)
panic(err)
}
return bytes
}
func (self Receipts) Len() int { return len(self) }
func (self Receipts) GetRlp(i int) []byte { return common.Rlp(self[i]) }
// Len returns the number of receipts in this list.
func (r Receipts) Len() int { return len(r) }
// GetRlp returns the RLP encoding of one receipt from the list.
func (r Receipts) GetRlp(i int) []byte { return common.Rlp(r[i]) }

View File

@ -25,42 +25,47 @@ import (
)
type Log struct {
// Consensus fields
Address common.Address
Topics []common.Hash
Data []byte
Number uint64
TxHash common.Hash
TxIndex uint
BlockHash common.Hash
Index uint
// Derived fields (don't reorder!)
BlockNumber uint64
TxHash common.Hash
TxIndex uint
BlockHash common.Hash
Index uint
}
func NewLog(address common.Address, topics []common.Hash, data []byte, number uint64) *Log {
return &Log{Address: address, Topics: topics, Data: data, Number: number}
return &Log{Address: address, Topics: topics, Data: data, BlockNumber: number}
}
func (self *Log) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{self.Address, self.Topics, self.Data})
func (l *Log) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{l.Address, l.Topics, l.Data})
}
func (self *Log) String() string {
return fmt.Sprintf(`log: %x %x %x %x %d %x %d`, self.Address, self.Topics, self.Data, self.TxHash, self.TxIndex, self.BlockHash, self.Index)
func (l *Log) DecodeRLP(s *rlp.Stream) error {
var log struct {
Address common.Address
Topics []common.Hash
Data []byte
}
if err := s.Decode(&log); err != nil {
return err
}
l.Address, l.Topics, l.Data = log.Address, log.Topics, log.Data
return nil
}
func (l *Log) String() string {
return fmt.Sprintf(`log: %x %x %x %x %d %x %d`, l.Address, l.Topics, l.Data, l.TxHash, l.TxIndex, l.BlockHash, l.Index)
}
type Logs []*Log
// LogForStorage is a wrapper around a Log that flattens and parses the entire
// content of a log, as opposed to only the consensus fields originally (by hiding
// the rlp interface methods).
type LogForStorage Log
func (self *LogForStorage) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{
self.Address,
self.Topics,
self.Data,
self.Number,
self.TxHash,
self.TxIndex,
self.BlockHash,
self.Index,
})
}