eth/downloader: flush state sync data before exit (#16280)
This commit is contained in:
parent
0fac705ed0
commit
1100e8ba63
@ -274,15 +274,21 @@ func (s *stateSync) Cancel() error {
|
|||||||
// receive data from peers, rather those are buffered up in the downloader and
|
// receive data from peers, rather those are buffered up in the downloader and
|
||||||
// pushed here async. The reason is to decouple processing from data receipt
|
// pushed here async. The reason is to decouple processing from data receipt
|
||||||
// and timeouts.
|
// and timeouts.
|
||||||
func (s *stateSync) loop() error {
|
func (s *stateSync) loop() (err error) {
|
||||||
// Listen for new peer events to assign tasks to them
|
// Listen for new peer events to assign tasks to them
|
||||||
newPeer := make(chan *peerConnection, 1024)
|
newPeer := make(chan *peerConnection, 1024)
|
||||||
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
|
peerSub := s.d.peers.SubscribeNewPeers(newPeer)
|
||||||
defer peerSub.Unsubscribe()
|
defer peerSub.Unsubscribe()
|
||||||
|
defer func() {
|
||||||
|
cerr := s.commit(true)
|
||||||
|
if err == nil {
|
||||||
|
err = cerr
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Keep assigning new tasks until the sync completes or aborts
|
// Keep assigning new tasks until the sync completes or aborts
|
||||||
for s.sched.Pending() > 0 {
|
for s.sched.Pending() > 0 {
|
||||||
if err := s.commit(false); err != nil {
|
if err = s.commit(false); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.assignTasks()
|
s.assignTasks()
|
||||||
@ -307,14 +313,14 @@ func (s *stateSync) loop() error {
|
|||||||
s.d.dropPeer(req.peer.id)
|
s.d.dropPeer(req.peer.id)
|
||||||
}
|
}
|
||||||
// Process all the received blobs and check for stale delivery
|
// Process all the received blobs and check for stale delivery
|
||||||
if err := s.process(req); err != nil {
|
if err = s.process(req); err != nil {
|
||||||
log.Warn("Node data write error", "err", err)
|
log.Warn("Node data write error", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.peer.SetNodeDataIdle(len(req.response))
|
req.peer.SetNodeDataIdle(len(req.response))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return s.commit(true)
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateSync) commit(force bool) error {
|
func (s *stateSync) commit(force bool) error {
|
||||||
@ -323,7 +329,9 @@ func (s *stateSync) commit(force bool) error {
|
|||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
b := s.d.stateDB.NewBatch()
|
b := s.d.stateDB.NewBatch()
|
||||||
s.sched.Commit(b)
|
if written, err := s.sched.Commit(b); written == 0 || err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := b.Write(); err != nil {
|
if err := b.Write(); err != nil {
|
||||||
return fmt.Errorf("DB write error: %v", err)
|
return fmt.Errorf("DB write error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -212,7 +212,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit flushes the data stored in the internal membatch out to persistent
|
// Commit flushes the data stored in the internal membatch out to persistent
|
||||||
// storage, returning th enumber of items written and any occurred error.
|
// storage, returning the number of items written and any occurred error.
|
||||||
func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) {
|
func (s *TrieSync) Commit(dbw ethdb.Putter) (int, error) {
|
||||||
// Dump the membatch into a database dbw
|
// Dump the membatch into a database dbw
|
||||||
for i, key := range s.membatch.order {
|
for i, key := range s.membatch.order {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user