les: handler separation (#19639)

les: handler separation
This commit is contained in:
gary rong
2019-08-21 17:29:34 +08:00
committed by Felföldi Zsolt
parent 4aee0d1994
commit 2ed729d38e
31 changed files with 2377 additions and 2525 deletions

View File

@ -40,9 +40,8 @@ const (
// ODR system to ensure that we only request data related to a certain block from peers who have already processed
// and announced that block.
type lightFetcher struct {
pm *ProtocolManager
odr *LesOdr
chain lightChain
handler *clientHandler
chain *light.LightChain
lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests
maxConfirmedTd *big.Int
@ -58,13 +57,9 @@ type lightFetcher struct {
requestTriggered bool
requestTrigger chan struct{}
lastTrustedHeader *types.Header
}
// lightChain extends the BlockChain interface by locking.
type lightChain interface {
BlockChain
LockChain()
UnlockChain()
closeCh chan struct{}
wg sync.WaitGroup
}
// fetcherPeerInfo holds fetcher-specific information about each active peer
@ -114,32 +109,37 @@ type fetchResponse struct {
}
// newLightFetcher creates a new light fetcher
func newLightFetcher(pm *ProtocolManager) *lightFetcher {
func newLightFetcher(h *clientHandler) *lightFetcher {
f := &lightFetcher{
pm: pm,
chain: pm.blockchain.(*light.LightChain),
odr: pm.odr,
handler: h,
chain: h.backend.blockchain,
peers: make(map[*peer]*fetcherPeerInfo),
deliverChn: make(chan fetchResponse, 100),
requested: make(map[uint64]fetchRequest),
timeoutChn: make(chan uint64),
requestTrigger: make(chan struct{}, 1),
syncDone: make(chan *peer),
closeCh: make(chan struct{}),
maxConfirmedTd: big.NewInt(0),
}
pm.peers.notify(f)
h.backend.peers.notify(f)
f.pm.wg.Add(1)
f.wg.Add(1)
go f.syncLoop()
return f
}
func (f *lightFetcher) close() {
close(f.closeCh)
f.wg.Wait()
}
// syncLoop is the main event loop of the light fetcher
func (f *lightFetcher) syncLoop() {
defer f.pm.wg.Done()
defer f.wg.Done()
for {
select {
case <-f.pm.quitSync:
case <-f.closeCh:
return
// request loop keeps running until no further requests are necessary or possible
case <-f.requestTrigger:
@ -156,7 +156,7 @@ func (f *lightFetcher) syncLoop() {
f.lock.Unlock()
if rq != nil {
if _, ok := <-f.pm.reqDist.queue(rq); ok {
if _, ok := <-f.handler.backend.reqDist.queue(rq); ok {
if syncing {
f.lock.Lock()
f.syncing = true
@ -187,9 +187,9 @@ func (f *lightFetcher) syncLoop() {
}
f.reqMu.Unlock()
if ok {
f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true)
req.peer.Log().Debug("Fetching data timed out hard")
go f.pm.removePeer(req.peer.id)
go f.handler.removePeer(req.peer.id)
}
case resp := <-f.deliverChn:
f.reqMu.Lock()
@ -202,12 +202,12 @@ func (f *lightFetcher) syncLoop() {
}
f.reqMu.Unlock()
if ok {
f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
f.handler.backend.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout)
}
f.lock.Lock()
if !ok || !(f.syncing || f.processResponse(req, resp)) {
resp.peer.Log().Debug("Failed processing response")
go f.pm.removePeer(resp.peer.id)
go f.handler.removePeer(resp.peer.id)
}
f.lock.Unlock()
case p := <-f.syncDone:
@ -264,7 +264,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 {
// announced tds should be strictly monotonic
p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td)
go f.pm.removePeer(p.id)
go f.handler.removePeer(p.id)
return
}
@ -297,7 +297,7 @@ func (f *lightFetcher) announce(p *peer, head *announceData) {
// if one of root's children is canonical, keep it, delete other branches and root itself
var newRoot *fetcherTreeNode
for i, nn := range fp.root.children {
if rawdb.ReadCanonicalHash(f.pm.chainDb, nn.number) == nn.hash {
if rawdb.ReadCanonicalHash(f.handler.backend.chainDb, nn.number) == nn.hash {
fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...)
nn.parent = nil
newRoot = nn
@ -390,7 +390,7 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64, ha
//
// when syncing, just check if it is part of the known chain, there is nothing better we
// can do since we do not know the most recent block hash yet
return rawdb.ReadCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.pm.chainDb, number) == hash
return rawdb.ReadCanonicalHash(f.handler.backend.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.handler.backend.chainDb, number) == hash
}
// requestAmount calculates the amount of headers to be downloaded starting
@ -453,8 +453,7 @@ func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint6
if f.checkKnownNode(p, n) || n.requested {
continue
}
//if ulc mode is disabled, isTrustedHash returns true
// if ulc mode is disabled, isTrustedHash returns true
amount := f.requestAmount(p, n)
if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && (f.isTrustedHash(hash) || f.maxConfirmedTd.Int64() == 0) {
bestHash = hash
@ -470,7 +469,7 @@ func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint6
// isTrustedHash checks if the block can be trusted by the minimum trusted fraction.
func (f *lightFetcher) isTrustedHash(hash common.Hash) bool {
// If ultra light cliet mode is disabled, trust all hashes
if f.pm.ulc == nil {
if f.handler.ulc == nil {
return true
}
// Ultra light enabled, only trust after enough confirmations
@ -480,7 +479,7 @@ func (f *lightFetcher) isTrustedHash(hash common.Hash) bool {
agreed++
}
}
return 100*agreed/len(f.pm.ulc.keys) >= f.pm.ulc.fraction
return 100*agreed/len(f.handler.ulc.keys) >= f.handler.ulc.fraction
}
func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
@ -500,14 +499,14 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
return fp != nil && fp.nodeByHash[bestHash] != nil
},
request: func(dp distPeer) func() {
if f.pm.ulc != nil {
if f.handler.ulc != nil {
// Keep last trusted header before sync
f.setLastTrustedHeader(f.chain.CurrentHeader())
}
go func() {
p := dp.(*peer)
p.Log().Debug("Synchronisation started")
f.pm.synchronise(p)
f.handler.synchronise(p)
f.syncDone <- p
}()
return nil
@ -607,7 +606,7 @@ func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) {
for p, fp := range f.peers {
if !f.checkAnnouncedHeaders(fp, headers, tds) {
p.Log().Debug("Inconsistent announcement")
go f.pm.removePeer(p.id)
go f.handler.removePeer(p.id)
}
if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) {
maxTd = fp.confirmedTd
@ -705,7 +704,7 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) {
node = fp.lastAnnounced
td *big.Int
)
if f.pm.ulc != nil {
if f.handler.ulc != nil {
// Roll back untrusted blocks
h, unapproved := f.lastTrustedTreeNode(p)
f.chain.Rollback(unapproved)
@ -721,7 +720,7 @@ func (f *lightFetcher) checkSyncedHeaders(p *peer) {
// Now node is the latest downloaded/approved header after syncing
if node == nil {
p.Log().Debug("Synchronisation failed")
go f.pm.removePeer(p.id)
go f.handler.removePeer(p.id)
return
}
header := f.chain.GetHeader(node.hash, node.number)
@ -741,7 +740,7 @@ func (f *lightFetcher) lastTrustedTreeNode(p *peer) (*types.Header, []common.Has
if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() {
canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64())
}
commonAncestor := rawdb.FindCommonAncestor(f.pm.chainDb, canonical, f.lastTrustedHeader)
commonAncestor := rawdb.FindCommonAncestor(f.handler.backend.chainDb, canonical, f.lastTrustedHeader)
if commonAncestor == nil {
log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash())
return current, unapprovedHashes
@ -787,7 +786,7 @@ func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool {
}
if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) {
p.Log().Debug("Inconsistent announcement")
go f.pm.removePeer(p.id)
go f.handler.removePeer(p.id)
}
if fp.confirmedTd != nil {
f.updateMaxConfirmedTd(fp.confirmedTd)
@ -880,12 +879,12 @@ func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) {
fp.firstUpdateStats = newEntry
}
for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) {
f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout)
fp.firstUpdateStats = fp.firstUpdateStats.next
}
if fp.confirmedTd != nil {
for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 {
f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
f.handler.backend.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time))
fp.firstUpdateStats = fp.firstUpdateStats.next
}
}