Merge pull request #2523 from fjl/shutdown
core, eth, miner: improve shutdown synchronisation
This commit is contained in:
@ -416,6 +416,7 @@ func (s *Ethereum) Stop() error {
|
||||
s.blockchain.Stop()
|
||||
s.protocolManager.Stop()
|
||||
s.txPool.Stop()
|
||||
s.miner.Stop()
|
||||
s.eventMux.Stop()
|
||||
|
||||
s.StopAutoDAG()
|
||||
|
@ -74,14 +74,14 @@ type ProtocolManager struct {
|
||||
minedBlockSub event.Subscription
|
||||
|
||||
// channels for fetcher, syncer, txsyncLoop
|
||||
newPeerCh chan *peer
|
||||
txsyncCh chan *txsync
|
||||
quitSync chan struct{}
|
||||
newPeerCh chan *peer
|
||||
txsyncCh chan *txsync
|
||||
quitSync chan struct{}
|
||||
noMorePeers chan struct{}
|
||||
|
||||
// wait group is used for graceful shutdowns during downloading
|
||||
// and processing
|
||||
wg sync.WaitGroup
|
||||
quit bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||
@ -94,16 +94,17 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
|
||||
}
|
||||
// Create the protocol manager with the base fields
|
||||
manager := &ProtocolManager{
|
||||
networkId: networkId,
|
||||
fastSync: fastSync,
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
blockchain: blockchain,
|
||||
chaindb: chaindb,
|
||||
peers: newPeerSet(),
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
networkId: networkId,
|
||||
fastSync: fastSync,
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
blockchain: blockchain,
|
||||
chaindb: chaindb,
|
||||
peers: newPeerSet(),
|
||||
newPeerCh: make(chan *peer),
|
||||
noMorePeers: make(chan struct{}),
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
}
|
||||
// Initiate a sub-protocol for every implemented version we can handle
|
||||
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
|
||||
@ -120,8 +121,14 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int,
|
||||
Length: ProtocolLengths[i],
|
||||
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
|
||||
peer := manager.newPeer(int(version), p, rw)
|
||||
manager.newPeerCh <- peer
|
||||
return manager.handle(peer)
|
||||
select {
|
||||
case manager.newPeerCh <- peer:
|
||||
manager.wg.Add(1)
|
||||
defer manager.wg.Done()
|
||||
return manager.handle(peer)
|
||||
case <-manager.quitSync:
|
||||
return p2p.DiscQuitting
|
||||
}
|
||||
},
|
||||
NodeInfo: func() interface{} {
|
||||
return manager.NodeInfo()
|
||||
@ -187,16 +194,25 @@ func (pm *ProtocolManager) Start() {
|
||||
}
|
||||
|
||||
func (pm *ProtocolManager) Stop() {
|
||||
// Showing a log message. During download / process this could actually
|
||||
// take between 5 to 10 seconds and therefor feedback is required.
|
||||
glog.V(logger.Info).Infoln("Stopping ethereum protocol handler...")
|
||||
|
||||
pm.quit = true
|
||||
pm.txSub.Unsubscribe() // quits txBroadcastLoop
|
||||
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
|
||||
close(pm.quitSync) // quits syncer, fetcher, txsyncLoop
|
||||
|
||||
// Wait for any process action
|
||||
// Quit the sync loop.
|
||||
// After this send has completed, no new peers will be accepted.
|
||||
pm.noMorePeers <- struct{}{}
|
||||
|
||||
// Quit fetcher, txsyncLoop.
|
||||
close(pm.quitSync)
|
||||
|
||||
// Disconnect existing sessions.
|
||||
// This also closes the gate for any new registrations on the peer set.
|
||||
// sessions which are already established but not added to pm.peers yet
|
||||
// will exit when they try to register.
|
||||
pm.peers.Close()
|
||||
|
||||
// Wait for all peer handler goroutines and the loops to come down.
|
||||
pm.wg.Wait()
|
||||
|
||||
glog.V(logger.Info).Infoln("Ethereum protocol handler stopped")
|
||||
|
@ -140,14 +140,14 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
|
||||
// Start the peer on a new thread
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
pm.newPeerCh <- peer
|
||||
errc <- pm.handle(peer)
|
||||
select {
|
||||
case pm.newPeerCh <- peer:
|
||||
errc <- pm.handle(peer)
|
||||
case <-pm.quitSync:
|
||||
errc <- p2p.DiscQuitting
|
||||
}
|
||||
}()
|
||||
tp := &testPeer{
|
||||
app: app,
|
||||
net: net,
|
||||
peer: peer,
|
||||
}
|
||||
tp := &testPeer{app: app, net: net, peer: peer}
|
||||
// Execute any implicitly requested handshakes and return
|
||||
if shake {
|
||||
td, head, genesis := pm.blockchain.Status()
|
||||
|
21
eth/peer.go
21
eth/peer.go
@ -34,6 +34,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
errClosed = errors.New("peer set is closed")
|
||||
errAlreadyRegistered = errors.New("peer is already registered")
|
||||
errNotRegistered = errors.New("peer is not registered")
|
||||
)
|
||||
@ -351,8 +352,9 @@ func (p *peer) String() string {
|
||||
// peerSet represents the collection of active peers currently participating in
|
||||
// the Ethereum sub-protocol.
|
||||
type peerSet struct {
|
||||
peers map[string]*peer
|
||||
lock sync.RWMutex
|
||||
peers map[string]*peer
|
||||
lock sync.RWMutex
|
||||
closed bool
|
||||
}
|
||||
|
||||
// newPeerSet creates a new peer set to track the active participants.
|
||||
@ -368,6 +370,9 @@ func (ps *peerSet) Register(p *peer) error {
|
||||
ps.lock.Lock()
|
||||
defer ps.lock.Unlock()
|
||||
|
||||
if ps.closed {
|
||||
return errClosed
|
||||
}
|
||||
if _, ok := ps.peers[p.id]; ok {
|
||||
return errAlreadyRegistered
|
||||
}
|
||||
@ -450,3 +455,15 @@ func (ps *peerSet) BestPeer() *peer {
|
||||
}
|
||||
return bestPeer
|
||||
}
|
||||
|
||||
// Close disconnects all peers.
|
||||
// No new peers can be registered after Close has returned.
|
||||
func (ps *peerSet) Close() {
|
||||
ps.lock.Lock()
|
||||
defer ps.lock.Unlock()
|
||||
|
||||
for _, p := range ps.peers {
|
||||
p.Disconnect(p2p.DiscQuitting)
|
||||
}
|
||||
ps.closed = true
|
||||
}
|
||||
|
@ -148,7 +148,7 @@ func (pm *ProtocolManager) syncer() {
|
||||
// Force a sync even if not enough peers are present
|
||||
go pm.synchronise(pm.peers.BestPeer())
|
||||
|
||||
case <-pm.quitSync:
|
||||
case <-pm.noMorePeers:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user