consensus, miner: stale block mining support (#17506)

* consensus, miner: stale block supporting

* consensus, miner: refactor seal signature

* cmd, consensus, eth: add miner noverify flag

* cmd, consensus, miner: polish
This commit is contained in:
gary rong
2018-08-28 21:59:05 +08:00
committed by Péter Szilágyi
parent 63352bf424
commit c1c003e4ff
16 changed files with 317 additions and 183 deletions

View File

@ -73,7 +73,7 @@ const (
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
// staleThreshold is the maximum distance of the acceptable stale block.
// staleThreshold is the maximum depth of the acceptable stale block.
staleThreshold = 7
)
@ -139,7 +139,7 @@ type worker struct {
// Channels
newWorkCh chan *newWorkReq
taskCh chan *task
resultCh chan *task
resultCh chan *types.Block
startCh chan struct{}
exitCh chan struct{}
resubmitIntervalCh chan time.Duration
@ -186,7 +186,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend,
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
taskCh: make(chan *task),
resultCh: make(chan *task, resultQueueSize),
resultCh: make(chan *types.Block, resultQueueSize),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
@ -269,18 +269,10 @@ func (w *worker) isRunning() bool {
return atomic.LoadInt32(&w.running) == 1
}
// close terminates all background threads maintained by the worker and cleans up buffered channels.
// close terminates all background threads maintained by the worker.
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
close(w.exitCh)
// Clean up buffered channels
for empty := false; !empty; {
select {
case <-w.resultCh:
default:
empty = true
}
}
}
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
@ -471,42 +463,6 @@ func (w *worker) mainLoop() {
}
}
// seal pushes a sealing task to consensus engine and submits the result.
func (w *worker) seal(t *task, stop <-chan struct{}) {
if w.skipSealHook != nil && w.skipSealHook(t) {
return
}
// The reason for caching task first is:
// A previous sealing action will be canceled by subsequent actions,
// however, remote miner may submit a result based on the cancelled task.
// So we should only submit the pending state corresponding to the seal result.
// TODO(rjl493456442) Replace the seal-wait logic structure
w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(t.block.Header())] = t
w.pendingMu.Unlock()
if block, err := w.engine.Seal(w.chain, t.block, stop); block != nil {
sealhash := w.engine.SealHash(block.Header())
w.pendingMu.RLock()
task, exist := w.pendingTasks[sealhash]
w.pendingMu.RUnlock()
if !exist {
log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash())
return
}
// Assemble sealing result
task.block = block
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", block.Hash(),
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
select {
case w.resultCh <- task:
case <-w.exitCh:
}
} else if err != nil {
log.Warn("Block sealing failed", "err", err)
}
}
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
@ -533,10 +489,20 @@ func (w *worker) taskLoop() {
if sealHash == prev {
continue
}
// Interrupt previous sealing operation
interrupt()
stopCh = make(chan struct{})
prev = sealHash
go w.seal(task, stopCh)
stopCh, prev = make(chan struct{}), sealHash
if w.skipSealHook != nil && w.skipSealHook(task) {
continue
}
w.pendingMu.Lock()
w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
w.pendingMu.Unlock()
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
log.Warn("Block sealing failed", "err", err)
}
case <-w.exitCh:
interrupt()
return
@ -549,38 +515,54 @@ func (w *worker) taskLoop() {
func (w *worker) resultLoop() {
for {
select {
case result := <-w.resultCh:
case block := <-w.resultCh:
// Short circuit when receiving empty result.
if result == nil {
if block == nil {
continue
}
// Short circuit when receiving duplicate result caused by resubmitting.
block := result.block
if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
continue
}
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, r := range result.receipts {
for _, l := range r.Logs {
l.BlockHash = block.Hash()
}
var (
sealhash = w.engine.SealHash(block.Header())
hash = block.Hash()
)
w.pendingMu.RLock()
task, exist := w.pendingTasks[sealhash]
w.pendingMu.RUnlock()
if !exist {
log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
continue
}
for _, log := range result.state.Logs() {
log.BlockHash = block.Hash()
// Different block could share same sealhash, deep copy here to prevent write-write conflict.
var (
receipts = make([]*types.Receipt, len(task.receipts))
logs []*types.Log
)
for i, receipt := range task.receipts {
receipts[i] = new(types.Receipt)
*receipts[i] = *receipt
// Update the block hash in all logs since it is now available and not when the
// receipt/log of individual transactions were created.
for _, log := range receipt.Logs {
log.BlockHash = hash
}
logs = append(logs, receipt.Logs...)
}
// Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, result.receipts, result.state)
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
if err != nil {
log.Error("Failed writing block to chain", "err", err)
continue
}
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block})
var (
events []interface{}
logs = result.state.Logs()
)
var events []interface{}
switch stat {
case core.CanonStatTy:
events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})