core, eth, rpc: split out block validator and state processor
This removes the burden on a single object to take care of all validation and state processing. Now instead the validation is done by the `core.BlockValidator` (`types.Validator`) that takes care of both header and uncle validation through the `ValidateBlock` method and state validation through the `ValidateState` method. The state processing is done by a new object `core.StateProcessor` (`types.Processor`) and accepts a new state as input and uses that to process the given block's transactions (and uncles for rewords) to calculate the state root for the next block (P_n + 1).
This commit is contained in:
@ -33,6 +33,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
@ -61,17 +62,34 @@ const (
|
||||
blockCacheLimit = 256
|
||||
maxFutureBlocks = 256
|
||||
maxTimeFutureBlocks = 30
|
||||
// must be bumped when consensus algorithm is changed, this forces the upgradedb
|
||||
// command to be run (forces the blocks to be imported again using the new algorithm)
|
||||
BlockChainVersion = 3
|
||||
)
|
||||
|
||||
// BlockChain represents the canonical chain given a database with a genesis
|
||||
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
|
||||
//
|
||||
// Importing blocks in to the block chain happens according to the set of rules
|
||||
// defined by the two stage Validator. Processing of blocks is done using the
|
||||
// Processor which processes the included transaction. The validation of the state
|
||||
// is done in the second part of the Validator. Failing results in aborting of
|
||||
// the import.
|
||||
//
|
||||
// The BlockChain also helps in returning blocks from **any** chain included
|
||||
// in the database as well as blocks that represents the canonical chain. It's
|
||||
// important to note that GetBlock can return any block and does not need to be
|
||||
// included in the canonical one where as GetBlockByNumber always represents the
|
||||
// canonical chain.
|
||||
type BlockChain struct {
|
||||
chainDb ethdb.Database
|
||||
processor types.BlockProcessor
|
||||
eventMux *event.TypeMux
|
||||
genesisBlock *types.Block
|
||||
// Last known total difficulty
|
||||
mu sync.RWMutex
|
||||
chainmu sync.RWMutex
|
||||
tsmu sync.RWMutex
|
||||
procmu sync.RWMutex
|
||||
|
||||
checkpoint int // checkpoint counts towards the new checkpoint
|
||||
currentHeader *types.Header // Current head of the header chain (may be above the block chain!)
|
||||
@ -91,10 +109,15 @@ type BlockChain struct {
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
wg sync.WaitGroup
|
||||
|
||||
pow pow.PoW
|
||||
rand *mrand.Rand
|
||||
pow pow.PoW
|
||||
rand *mrand.Rand
|
||||
processor Processor
|
||||
validator Validator
|
||||
}
|
||||
|
||||
// NewBlockChain returns a fully initialised block chain using information
|
||||
// available in the database. It initialiser the default Ethereum Validator and
|
||||
// Processor.
|
||||
func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*BlockChain, error) {
|
||||
headerCache, _ := lru.New(headerCacheLimit)
|
||||
bodyCache, _ := lru.New(bodyCacheLimit)
|
||||
@ -121,6 +144,8 @@ func NewBlockChain(chainDb ethdb.Database, pow pow.PoW, mux *event.TypeMux) (*Bl
|
||||
return nil, err
|
||||
}
|
||||
bc.rand = mrand.New(mrand.NewSource(seed.Int64()))
|
||||
bc.SetValidator(NewBlockValidator(bc, pow))
|
||||
bc.SetProcessor(NewStateProcessor(bc))
|
||||
|
||||
bc.genesisBlock = bc.GetBlockByNumber(0)
|
||||
if bc.genesisBlock == nil {
|
||||
@ -292,6 +317,7 @@ func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GasLimit returns the gas limit of the current HEAD block.
|
||||
func (self *BlockChain) GasLimit() *big.Int {
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
@ -299,6 +325,7 @@ func (self *BlockChain) GasLimit() *big.Int {
|
||||
return self.currentBlock.GasLimit()
|
||||
}
|
||||
|
||||
// LastBlockHash return the hash of the HEAD block.
|
||||
func (self *BlockChain) LastBlockHash() common.Hash {
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
@ -333,6 +360,8 @@ func (self *BlockChain) CurrentFastBlock() *types.Block {
|
||||
return self.currentFastBlock
|
||||
}
|
||||
|
||||
// Status returns status information about the current chain such as the HEAD Td,
|
||||
// the HEAD hash and the hash of the genesis block.
|
||||
func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) {
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
@ -340,10 +369,38 @@ func (self *BlockChain) Status() (td *big.Int, currentBlock common.Hash, genesis
|
||||
return self.GetTd(self.currentBlock.Hash()), self.currentBlock.Hash(), self.genesisBlock.Hash()
|
||||
}
|
||||
|
||||
func (self *BlockChain) SetProcessor(proc types.BlockProcessor) {
|
||||
self.processor = proc
|
||||
// SetProcessor sets the processor required for making state modifications.
|
||||
func (self *BlockChain) SetProcessor(processor Processor) {
|
||||
self.procmu.Lock()
|
||||
defer self.procmu.Unlock()
|
||||
self.processor = processor
|
||||
}
|
||||
|
||||
// SetValidator sets the validator which is used to validate incoming blocks.
|
||||
func (self *BlockChain) SetValidator(validator Validator) {
|
||||
self.procmu.Lock()
|
||||
defer self.procmu.Unlock()
|
||||
self.validator = validator
|
||||
}
|
||||
|
||||
// Validator returns the current validator.
|
||||
func (self *BlockChain) Validator() Validator {
|
||||
self.procmu.RLock()
|
||||
defer self.procmu.RUnlock()
|
||||
return self.validator
|
||||
}
|
||||
|
||||
// Processor returns the current processor.
|
||||
func (self *BlockChain) Processor() Processor {
|
||||
self.procmu.RLock()
|
||||
defer self.procmu.RUnlock()
|
||||
return self.processor
|
||||
}
|
||||
|
||||
// AuxValidator returns the auxiliary validator (Proof of work atm)
|
||||
func (self *BlockChain) AuxValidator() pow.PoW { return self.pow }
|
||||
|
||||
// State returns a new mutable state based on the current HEAD block.
|
||||
func (self *BlockChain) State() (*state.StateDB, error) {
|
||||
return state.New(self.CurrentBlock().Root(), self.chainDb)
|
||||
}
|
||||
@ -606,6 +663,8 @@ func (self *BlockChain) GetUnclesInChain(block *types.Block, length int) []*type
|
||||
return uncles
|
||||
}
|
||||
|
||||
// Stop stops the blockchain service. If any imports are currently in progress
|
||||
// it will abort them using the procInterrupt.
|
||||
func (bc *BlockChain) Stop() {
|
||||
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
|
||||
return
|
||||
@ -758,9 +817,9 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
|
||||
|
||||
var err error
|
||||
if index == 0 {
|
||||
err = self.processor.ValidateHeader(header, checkPow, false)
|
||||
err = self.Validator().ValidateHeader(header, self.GetHeader(header.ParentHash), checkPow)
|
||||
} else {
|
||||
err = self.processor.ValidateHeaderWithParent(header, chain[index-1], checkPow, false)
|
||||
err = self.Validator().ValidateHeader(header, chain[index-1], checkPow)
|
||||
}
|
||||
if err != nil {
|
||||
errs[index] = err
|
||||
@ -1025,9 +1084,10 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
// faster than direct delivery and requires much less mutex
|
||||
// acquiring.
|
||||
var (
|
||||
stats struct{ queued, processed, ignored int }
|
||||
events = make([]interface{}, 0, len(chain))
|
||||
tstart = time.Now()
|
||||
stats struct{ queued, processed, ignored int }
|
||||
events = make([]interface{}, 0, len(chain))
|
||||
coalescedLogs vm.Logs
|
||||
tstart = time.Now()
|
||||
|
||||
nonceChecked = make([]bool, len(chain))
|
||||
)
|
||||
@ -1057,12 +1117,12 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
|
||||
if BadHashes[block.Hash()] {
|
||||
err := BadHashError(block.Hash())
|
||||
blockErr(block, err)
|
||||
reportBlock(block, err)
|
||||
return i, err
|
||||
}
|
||||
// Call in to the block processor and check for errors. It's likely that if one block fails
|
||||
// all others will fail too (unless a known block is returned).
|
||||
logs, receipts, err := self.processor.Process(block)
|
||||
// Stage 1 validation of the block using the chain's validator
|
||||
// interface.
|
||||
err := self.Validator().ValidateBlock(block)
|
||||
if err != nil {
|
||||
if IsKnownBlockErr(err) {
|
||||
stats.ignored++
|
||||
@ -1089,14 +1149,41 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
blockErr(block, err)
|
||||
|
||||
go ReportBlock(block, err)
|
||||
reportBlock(block, err)
|
||||
|
||||
return i, err
|
||||
}
|
||||
|
||||
// Create a new statedb using the parent block and report an
|
||||
// error if it fails.
|
||||
statedb, err := state.New(self.GetBlock(block.ParentHash()).Root(), self.chainDb)
|
||||
if err != nil {
|
||||
reportBlock(block, err)
|
||||
return i, err
|
||||
}
|
||||
// Process block using the parent state as reference point.
|
||||
receipts, logs, usedGas, err := self.processor.Process(block, statedb)
|
||||
if err != nil {
|
||||
reportBlock(block, err)
|
||||
return i, err
|
||||
}
|
||||
// Validate the state using the default validator
|
||||
err = self.Validator().ValidateState(block, self.GetBlock(block.ParentHash()), statedb, receipts, usedGas)
|
||||
if err != nil {
|
||||
reportBlock(block, err)
|
||||
return i, err
|
||||
}
|
||||
// Write state changes to database
|
||||
_, err = statedb.Commit()
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
|
||||
// coalesce logs for later processing
|
||||
coalescedLogs = append(coalescedLogs, logs...)
|
||||
|
||||
if err := PutBlockReceipts(self.chainDb, block.Hash(), receipts); err != nil {
|
||||
glog.V(logger.Warn).Infoln("error writing block receipts:", err)
|
||||
return i, err
|
||||
}
|
||||
|
||||
txcount += len(block.Transactions())
|
||||
@ -1105,6 +1192,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
|
||||
switch status {
|
||||
case CanonStatTy:
|
||||
if glog.V(logger.Debug) {
|
||||
@ -1141,7 +1229,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
|
||||
start, end := chain[0], chain[len(chain)-1]
|
||||
glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4])
|
||||
}
|
||||
go self.postChainEvents(events)
|
||||
go self.postChainEvents(events, coalescedLogs)
|
||||
|
||||
return 0, nil
|
||||
}
|
||||
@ -1239,7 +1327,9 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
|
||||
|
||||
// postChainEvents iterates over the events generated by a chain insertion and
|
||||
// posts them into the event mux.
|
||||
func (self *BlockChain) postChainEvents(events []interface{}) {
|
||||
func (self *BlockChain) postChainEvents(events []interface{}, logs vm.Logs) {
|
||||
// post event logs for further processing
|
||||
self.eventMux.Post(logs)
|
||||
for _, event := range events {
|
||||
if event, ok := event.(ChainEvent); ok {
|
||||
// We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
|
||||
@ -1265,9 +1355,13 @@ func (self *BlockChain) update() {
|
||||
}
|
||||
}
|
||||
|
||||
func blockErr(block *types.Block, err error) {
|
||||
// reportBlock reports the given block and error using the canonical block
|
||||
// reporting tool. Reporting the block to the service is handled in a separate
|
||||
// goroutine.
|
||||
func reportBlock(block *types.Block, err error) {
|
||||
if glog.V(logger.Error) {
|
||||
glog.Errorf("Bad block #%v (%s)\n", block.Number(), block.Hash().Hex())
|
||||
glog.Errorf(" %v", err)
|
||||
}
|
||||
go ReportBlock(block, err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user