eth: improve shutdown synchronization (#20695)
* eth: improve shutdown synchronization Most goroutines started by eth.Ethereum didn't have any shutdown sync at all, which lead to weird error messages when quitting the client. This change improves the clean shutdown path by stopping all internal components in dependency order and waiting for them to actually be stopped before shutdown is considered done. In particular, we now stop everything related to peers before stopping 'resident' parts such as core.BlockChain. * eth: rewrite sync controller * eth: remove sync start debug message * eth: notify chainSyncer about new peers after handshake * eth: move downloader.Cancel call into chainSyncer * eth: make post-sync block broadcast synchronous * eth: add comments * core: change blockchain stop message * eth: change closeBloomHandler channel type
This commit is contained in:
@ -87,14 +87,12 @@ type ProtocolManager struct {
|
||||
whitelist map[uint64]common.Hash
|
||||
|
||||
// channels for fetcher, syncer, txsyncLoop
|
||||
newPeerCh chan *peer
|
||||
txsyncCh chan *txsync
|
||||
quitSync chan struct{}
|
||||
noMorePeers chan struct{}
|
||||
txsyncCh chan *txsync
|
||||
quitSync chan struct{}
|
||||
|
||||
// wait group is used for graceful shutdowns during downloading
|
||||
// and processing
|
||||
wg sync.WaitGroup
|
||||
chainSync *chainSyncer
|
||||
wg sync.WaitGroup
|
||||
peerWG sync.WaitGroup
|
||||
|
||||
// Test fields or hooks
|
||||
broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
|
||||
@ -105,18 +103,17 @@ type ProtocolManager struct {
|
||||
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
|
||||
// Create the protocol manager with the base fields
|
||||
manager := &ProtocolManager{
|
||||
networkID: networkID,
|
||||
forkFilter: forkid.NewFilter(blockchain),
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
blockchain: blockchain,
|
||||
peers: newPeerSet(),
|
||||
whitelist: whitelist,
|
||||
newPeerCh: make(chan *peer),
|
||||
noMorePeers: make(chan struct{}),
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
networkID: networkID,
|
||||
forkFilter: forkid.NewFilter(blockchain),
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
blockchain: blockchain,
|
||||
peers: newPeerSet(),
|
||||
whitelist: whitelist,
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
}
|
||||
|
||||
if mode == downloader.FullSync {
|
||||
// The database seems empty as the current block is the genesis. Yet the fast
|
||||
// block is ahead, so fast sync was enabled for this node at a certain point.
|
||||
@ -140,6 +137,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
|
||||
manager.fastSync = uint32(1)
|
||||
}
|
||||
}
|
||||
|
||||
// If we have trusted checkpoints, enforce them on the chain
|
||||
if checkpoint != nil {
|
||||
manager.checkpointNumber = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
|
||||
@ -199,6 +197,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
|
||||
}
|
||||
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
|
||||
|
||||
manager.chainSync = newChainSyncer(manager)
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
@ -213,15 +213,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
|
||||
Version: version,
|
||||
Length: length,
|
||||
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||
peer := pm.newPeer(int(version), p, rw, pm.txpool.Get)
|
||||
select {
|
||||
case pm.newPeerCh <- peer:
|
||||
pm.wg.Add(1)
|
||||
defer pm.wg.Done()
|
||||
return pm.handle(peer)
|
||||
case <-pm.quitSync:
|
||||
return p2p.DiscQuitting
|
||||
}
|
||||
return pm.runPeer(pm.newPeer(int(version), p, rw, pm.txpool.Get))
|
||||
},
|
||||
NodeInfo: func() interface{} {
|
||||
return pm.NodeInfo()
|
||||
@ -260,40 +252,37 @@ func (pm *ProtocolManager) Start(maxPeers int) {
|
||||
pm.maxPeers = maxPeers
|
||||
|
||||
// broadcast transactions
|
||||
pm.wg.Add(1)
|
||||
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
|
||||
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
|
||||
go pm.txBroadcastLoop()
|
||||
|
||||
// broadcast mined blocks
|
||||
pm.wg.Add(1)
|
||||
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
|
||||
go pm.minedBroadcastLoop()
|
||||
|
||||
// start sync handlers
|
||||
go pm.syncer()
|
||||
pm.wg.Add(2)
|
||||
go pm.chainSync.loop()
|
||||
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) Stop() {
|
||||
log.Info("Stopping Ethereum protocol")
|
||||
|
||||
pm.txsSub.Unsubscribe() // quits txBroadcastLoop
|
||||
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
||||
|
||||
// Quit the sync loop.
|
||||
// After this send has completed, no new peers will be accepted.
|
||||
pm.noMorePeers <- struct{}{}
|
||||
|
||||
// Quit fetcher, txsyncLoop.
|
||||
// Quit chainSync and txsync64.
|
||||
// After this is done, no new peers will be accepted.
|
||||
close(pm.quitSync)
|
||||
pm.wg.Wait()
|
||||
|
||||
// Disconnect existing sessions.
|
||||
// This also closes the gate for any new registrations on the peer set.
|
||||
// sessions which are already established but not added to pm.peers yet
|
||||
// will exit when they try to register.
|
||||
pm.peers.Close()
|
||||
|
||||
// Wait for all peer handler goroutines and the loops to come down.
|
||||
pm.wg.Wait()
|
||||
pm.peerWG.Wait()
|
||||
|
||||
log.Info("Ethereum protocol stopped")
|
||||
}
|
||||
@ -302,6 +291,15 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, ge
|
||||
return newPeer(pv, p, rw, getPooledTx)
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) runPeer(p *peer) error {
|
||||
if !pm.chainSync.handlePeerEvent(p) {
|
||||
return p2p.DiscQuitting
|
||||
}
|
||||
pm.peerWG.Add(1)
|
||||
defer pm.peerWG.Done()
|
||||
return pm.handle(p)
|
||||
}
|
||||
|
||||
// handle is the callback invoked to manage the life cycle of an eth peer. When
|
||||
// this function terminates, the peer is disconnected.
|
||||
func (pm *ProtocolManager) handle(p *peer) error {
|
||||
@ -323,6 +321,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
||||
p.Log().Debug("Ethereum handshake failed", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Register the peer locally
|
||||
if err := pm.peers.Register(p); err != nil {
|
||||
p.Log().Error("Ethereum peer registration failed", "err", err)
|
||||
@ -334,6 +333,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
||||
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
|
||||
return err
|
||||
}
|
||||
pm.chainSync.handlePeerEvent(p)
|
||||
|
||||
// Propagate existing transactions. new transactions appearing
|
||||
// after this will be sent via broadcasts.
|
||||
pm.syncTransactions(p)
|
||||
@ -723,14 +724,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
// Update the peer's total difficulty if better than the previous
|
||||
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
|
||||
p.SetHead(trueHead, trueTD)
|
||||
|
||||
// Schedule a sync if above ours. Note, this will not fire a sync for a gap of
|
||||
// a single block (as the true TD is below the propagated block), however this
|
||||
// scenario should easily be covered by the fetcher.
|
||||
currentHeader := pm.blockchain.CurrentHeader()
|
||||
if trueTD.Cmp(pm.blockchain.GetTd(currentHeader.Hash(), currentHeader.Number.Uint64())) > 0 {
|
||||
go pm.synchronise(p)
|
||||
}
|
||||
pm.chainSync.handlePeerEvent(p)
|
||||
}
|
||||
|
||||
case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
|
||||
@ -883,9 +877,10 @@ func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propaga
|
||||
}
|
||||
}
|
||||
|
||||
// Mined broadcast loop
|
||||
// minedBroadcastLoop sends mined blocks to connected peers.
|
||||
func (pm *ProtocolManager) minedBroadcastLoop() {
|
||||
// automatically stops if unsubscribe
|
||||
defer pm.wg.Done()
|
||||
|
||||
for obj := range pm.minedBlockSub.Chan() {
|
||||
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
|
||||
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
|
||||
@ -894,7 +889,10 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
// txBroadcastLoop announces new transactions to connected peers.
|
||||
func (pm *ProtocolManager) txBroadcastLoop() {
|
||||
defer pm.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case event := <-pm.txsCh:
|
||||
@ -906,7 +904,6 @@ func (pm *ProtocolManager) txBroadcastLoop() {
|
||||
pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
|
||||
pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
|
||||
|
||||
// Err() channel will be closed when unsubscribing.
|
||||
case <-pm.txsSub.Err():
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user