les: fix retriever logic (#16776)
This PR fixes a retriever logic bug. When a peer had a soft timeout and then a response arrived, it always assumed it was the same peer even though it could have been a later requested one that did not time out at all yet. In this case the logic went to an illegal state and deadlocked, causing a goroutine leak. Fixes #16243 and replaces #16359. Thanks to @riceke for finding the bug in the logic.
This commit is contained in:
		
				
					committed by
					
						
						Felix Lange
					
				
			
			
				
	
			
			
			
						parent
						
							049f5b3572
						
					
				
				
					commit
					25982375a8
				
			@@ -69,8 +69,8 @@ type sentReq struct {
 | 
				
			|||||||
	lock   sync.RWMutex // protect access to sentTo map
 | 
						lock   sync.RWMutex // protect access to sentTo map
 | 
				
			||||||
	sentTo map[distPeer]sentReqToPeer
 | 
						sentTo map[distPeer]sentReqToPeer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	reqQueued    bool // a request has been queued but not sent
 | 
						lastReqQueued bool     // last request has been queued but not sent
 | 
				
			||||||
	reqSent      bool // a request has been sent but not timed out
 | 
						lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out
 | 
				
			||||||
	reqSrtoCount  int      // number of requests that reached soft (but not hard) timeout
 | 
						reqSrtoCount  int      // number of requests that reached soft (but not hard) timeout
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -180,7 +180,7 @@ type reqStateFn func() reqStateFn
 | 
				
			|||||||
// retrieveLoop is the retrieval state machine event loop
 | 
					// retrieveLoop is the retrieval state machine event loop
 | 
				
			||||||
func (r *sentReq) retrieveLoop() {
 | 
					func (r *sentReq) retrieveLoop() {
 | 
				
			||||||
	go r.tryRequest()
 | 
						go r.tryRequest()
 | 
				
			||||||
	r.reqQueued = true
 | 
						r.lastReqQueued = true
 | 
				
			||||||
	state := r.stateRequesting
 | 
						state := r.stateRequesting
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for state != nil {
 | 
						for state != nil {
 | 
				
			||||||
@@ -214,7 +214,7 @@ func (r *sentReq) stateRequesting() reqStateFn {
 | 
				
			|||||||
		case rpSoftTimeout:
 | 
							case rpSoftTimeout:
 | 
				
			||||||
			// last request timed out, try asking a new peer
 | 
								// last request timed out, try asking a new peer
 | 
				
			||||||
			go r.tryRequest()
 | 
								go r.tryRequest()
 | 
				
			||||||
			r.reqQueued = true
 | 
								r.lastReqQueued = true
 | 
				
			||||||
			return r.stateRequesting
 | 
								return r.stateRequesting
 | 
				
			||||||
		case rpDeliveredValid:
 | 
							case rpDeliveredValid:
 | 
				
			||||||
			r.stop(nil)
 | 
								r.stop(nil)
 | 
				
			||||||
@@ -233,7 +233,7 @@ func (r *sentReq) stateNoMorePeers() reqStateFn {
 | 
				
			|||||||
	select {
 | 
						select {
 | 
				
			||||||
	case <-time.After(retryQueue):
 | 
						case <-time.After(retryQueue):
 | 
				
			||||||
		go r.tryRequest()
 | 
							go r.tryRequest()
 | 
				
			||||||
		r.reqQueued = true
 | 
							r.lastReqQueued = true
 | 
				
			||||||
		return r.stateRequesting
 | 
							return r.stateRequesting
 | 
				
			||||||
	case ev := <-r.eventsCh:
 | 
						case ev := <-r.eventsCh:
 | 
				
			||||||
		r.update(ev)
 | 
							r.update(ev)
 | 
				
			||||||
@@ -260,22 +260,26 @@ func (r *sentReq) stateStopped() reqStateFn {
 | 
				
			|||||||
func (r *sentReq) update(ev reqPeerEvent) {
 | 
					func (r *sentReq) update(ev reqPeerEvent) {
 | 
				
			||||||
	switch ev.event {
 | 
						switch ev.event {
 | 
				
			||||||
	case rpSent:
 | 
						case rpSent:
 | 
				
			||||||
		r.reqQueued = false
 | 
							r.lastReqQueued = false
 | 
				
			||||||
		if ev.peer != nil {
 | 
							r.lastReqSentTo = ev.peer
 | 
				
			||||||
			r.reqSent = true
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	case rpSoftTimeout:
 | 
						case rpSoftTimeout:
 | 
				
			||||||
		r.reqSent = false
 | 
							r.lastReqSentTo = nil
 | 
				
			||||||
		r.reqSrtoCount++
 | 
							r.reqSrtoCount++
 | 
				
			||||||
	case rpHardTimeout, rpDeliveredValid, rpDeliveredInvalid:
 | 
						case rpHardTimeout:
 | 
				
			||||||
		r.reqSrtoCount--
 | 
							r.reqSrtoCount--
 | 
				
			||||||
 | 
						case rpDeliveredValid, rpDeliveredInvalid:
 | 
				
			||||||
 | 
							if ev.peer == r.lastReqSentTo {
 | 
				
			||||||
 | 
								r.lastReqSentTo = nil
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								r.reqSrtoCount--
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// waiting returns true if the retrieval mechanism is waiting for an answer from
 | 
					// waiting returns true if the retrieval mechanism is waiting for an answer from
 | 
				
			||||||
// any peer
 | 
					// any peer
 | 
				
			||||||
func (r *sentReq) waiting() bool {
 | 
					func (r *sentReq) waiting() bool {
 | 
				
			||||||
	return r.reqQueued || r.reqSent || r.reqSrtoCount > 0
 | 
						return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// tryRequest tries to send the request to a new peer and waits for it to either
 | 
					// tryRequest tries to send the request to a new peer and waits for it to either
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user