swarm: prevent forever running retrieve request loops
This commit is contained in:
		
				
					committed by
					
						
						Janos Guljas
					
				
			
			
				
	
			
			
			
						parent
						
							d3441ebb56
						
					
				
				
					commit
					3f7acbbeb9
				
			@@ -32,6 +32,8 @@ var searchTimeout = 1 * time.Second
 | 
				
			|||||||
// Also used in stream delivery.
 | 
					// Also used in stream delivery.
 | 
				
			||||||
var RequestTimeout = 10 * time.Second
 | 
					var RequestTimeout = 10 * time.Second
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var maxHopCount uint8 = 20 // maximum number of forwarded requests (hops), to make sure requests are not forwarded forever in peer loops
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
 | 
					type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
 | 
					// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
 | 
				
			||||||
@@ -44,7 +46,7 @@ type Fetcher struct {
 | 
				
			|||||||
	protoRequestFunc RequestFunc     // request function fetcher calls to issue retrieve request for a chunk
 | 
						protoRequestFunc RequestFunc     // request function fetcher calls to issue retrieve request for a chunk
 | 
				
			||||||
	addr             storage.Address // the address of the chunk to be fetched
 | 
						addr             storage.Address // the address of the chunk to be fetched
 | 
				
			||||||
	offerC           chan *enode.ID  // channel of sources (peer node id strings)
 | 
						offerC           chan *enode.ID  // channel of sources (peer node id strings)
 | 
				
			||||||
	requestC         chan struct{}
 | 
						requestC         chan uint8      // channel for incoming requests (with the hopCount value in it)
 | 
				
			||||||
	skipCheck        bool
 | 
						skipCheck        bool
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -53,6 +55,7 @@ type Request struct {
 | 
				
			|||||||
	Source      *enode.ID       // nodeID of peer to request from (can be nil)
 | 
						Source      *enode.ID       // nodeID of peer to request from (can be nil)
 | 
				
			||||||
	SkipCheck   bool            // whether to offer the chunk first or deliver directly
 | 
						SkipCheck   bool            // whether to offer the chunk first or deliver directly
 | 
				
			||||||
	peersToSkip *sync.Map       // peers not to request chunk from (only makes sense if source is nil)
 | 
						peersToSkip *sync.Map       // peers not to request chunk from (only makes sense if source is nil)
 | 
				
			||||||
 | 
						HopCount    uint8           // number of forwarded requests (hops)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// NewRequest returns a new instance of Request based on chunk address skip check and
 | 
					// NewRequest returns a new instance of Request based on chunk address skip check and
 | 
				
			||||||
@@ -113,7 +116,7 @@ func NewFetcher(addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
 | 
				
			|||||||
		addr:             addr,
 | 
							addr:             addr,
 | 
				
			||||||
		protoRequestFunc: rf,
 | 
							protoRequestFunc: rf,
 | 
				
			||||||
		offerC:           make(chan *enode.ID),
 | 
							offerC:           make(chan *enode.ID),
 | 
				
			||||||
		requestC:         make(chan struct{}),
 | 
							requestC:         make(chan uint8),
 | 
				
			||||||
		skipCheck:        skipCheck,
 | 
							skipCheck:        skipCheck,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -136,7 +139,7 @@ func (f *Fetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
 | 
					// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
 | 
				
			||||||
func (f *Fetcher) Request(ctx context.Context) {
 | 
					func (f *Fetcher) Request(ctx context.Context, hopCount uint8) {
 | 
				
			||||||
	// First we need to have this select to make sure that we return if context is done
 | 
						// First we need to have this select to make sure that we return if context is done
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case <-ctx.Done():
 | 
						case <-ctx.Done():
 | 
				
			||||||
@@ -144,10 +147,15 @@ func (f *Fetcher) Request(ctx context.Context) {
 | 
				
			|||||||
	default:
 | 
						default:
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if hopCount >= maxHopCount {
 | 
				
			||||||
 | 
							log.Debug("fetcher request hop count limit reached", "hops", hopCount)
 | 
				
			||||||
 | 
							return
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// This select alone would not guarantee that we return of context is done, it could potentially
 | 
						// This select alone would not guarantee that we return of context is done, it could potentially
 | 
				
			||||||
	// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
 | 
						// push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case f.requestC <- struct{}{}:
 | 
						case f.requestC <- hopCount + 1:
 | 
				
			||||||
	case <-ctx.Done():
 | 
						case <-ctx.Done():
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -161,6 +169,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 | 
				
			|||||||
		waitC     <-chan time.Time // timer channel
 | 
							waitC     <-chan time.Time // timer channel
 | 
				
			||||||
		sources   []*enode.ID      // known sources, ie. peers that offered the chunk
 | 
							sources   []*enode.ID      // known sources, ie. peers that offered the chunk
 | 
				
			||||||
		requested bool             // true if the chunk was actually requested
 | 
							requested bool             // true if the chunk was actually requested
 | 
				
			||||||
 | 
							hopCount  uint8
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected
 | 
						gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -183,7 +192,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 | 
				
			|||||||
			doRequest = requested
 | 
								doRequest = requested
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// incoming request
 | 
							// incoming request
 | 
				
			||||||
		case <-f.requestC:
 | 
							case hopCount = <-f.requestC:
 | 
				
			||||||
			log.Trace("new request", "request addr", f.addr)
 | 
								log.Trace("new request", "request addr", f.addr)
 | 
				
			||||||
			// 2) chunk is requested, set requested flag
 | 
								// 2) chunk is requested, set requested flag
 | 
				
			||||||
			// launch a request iff none been launched yet
 | 
								// launch a request iff none been launched yet
 | 
				
			||||||
@@ -213,7 +222,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 | 
				
			|||||||
		// need to issue a new request
 | 
							// need to issue a new request
 | 
				
			||||||
		if doRequest {
 | 
							if doRequest {
 | 
				
			||||||
			var err error
 | 
								var err error
 | 
				
			||||||
			sources, err = f.doRequest(ctx, gone, peers, sources)
 | 
								sources, err = f.doRequest(ctx, gone, peers, sources, hopCount)
 | 
				
			||||||
			if err != nil {
 | 
								if err != nil {
 | 
				
			||||||
				log.Info("unable to request", "request addr", f.addr, "err", err)
 | 
									log.Info("unable to request", "request addr", f.addr, "err", err)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
@@ -251,7 +260,7 @@ func (f *Fetcher) run(ctx context.Context, peers *sync.Map) {
 | 
				
			|||||||
// * the peer's address is added to the set of peers to skip
 | 
					// * the peer's address is added to the set of peers to skip
 | 
				
			||||||
// * the peer's address is removed from prospective sources, and
 | 
					// * the peer's address is removed from prospective sources, and
 | 
				
			||||||
// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
 | 
					// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
 | 
				
			||||||
func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID) ([]*enode.ID, error) {
 | 
					func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
 | 
				
			||||||
	var i int
 | 
						var i int
 | 
				
			||||||
	var sourceID *enode.ID
 | 
						var sourceID *enode.ID
 | 
				
			||||||
	var quit chan struct{}
 | 
						var quit chan struct{}
 | 
				
			||||||
@@ -260,6 +269,7 @@ func (f *Fetcher) doRequest(ctx context.Context, gone chan *enode.ID, peersToSki
 | 
				
			|||||||
		Addr:        f.addr,
 | 
							Addr:        f.addr,
 | 
				
			||||||
		SkipCheck:   f.skipCheck,
 | 
							SkipCheck:   f.skipCheck,
 | 
				
			||||||
		peersToSkip: peersToSkip,
 | 
							peersToSkip: peersToSkip,
 | 
				
			||||||
 | 
							HopCount:    hopCount,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	foundSource := false
 | 
						foundSource := false
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -33,7 +33,7 @@ type mockRequester struct {
 | 
				
			|||||||
	// requests []Request
 | 
						// requests []Request
 | 
				
			||||||
	requestC  chan *Request   // when a request is coming it is pushed to requestC
 | 
						requestC  chan *Request   // when a request is coming it is pushed to requestC
 | 
				
			||||||
	waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional)
 | 
						waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional)
 | 
				
			||||||
	ctr       int             //counts the number of requests
 | 
						count     int             //counts the number of requests
 | 
				
			||||||
	quitC     chan struct{}
 | 
						quitC     chan struct{}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -47,9 +47,9 @@ func newMockRequester(waitTimes ...time.Duration) *mockRequester {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode.ID, chan struct{}, error) {
 | 
					func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode.ID, chan struct{}, error) {
 | 
				
			||||||
	waitTime := time.Duration(0)
 | 
						waitTime := time.Duration(0)
 | 
				
			||||||
	if m.ctr < len(m.waitTimes) {
 | 
						if m.count < len(m.waitTimes) {
 | 
				
			||||||
		waitTime = m.waitTimes[m.ctr]
 | 
							waitTime = m.waitTimes[m.count]
 | 
				
			||||||
		m.ctr++
 | 
							m.count++
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	time.Sleep(waitTime)
 | 
						time.Sleep(waitTime)
 | 
				
			||||||
	m.requestC <- request
 | 
						m.requestC <- request
 | 
				
			||||||
@@ -83,7 +83,7 @@ func TestFetcherSingleRequest(t *testing.T) {
 | 
				
			|||||||
	go fetcher.run(ctx, peersToSkip)
 | 
						go fetcher.run(ctx, peersToSkip)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rctx := context.Background()
 | 
						rctx := context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case request := <-requester.requestC:
 | 
						case request := <-requester.requestC:
 | 
				
			||||||
@@ -100,6 +100,11 @@ func TestFetcherSingleRequest(t *testing.T) {
 | 
				
			|||||||
			t.Fatalf("request.peersToSkip does not contain peer returned by the request function")
 | 
								t.Fatalf("request.peersToSkip does not contain peer returned by the request function")
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// hopCount in the forwarded request should be incremented
 | 
				
			||||||
 | 
							if request.HopCount != 1 {
 | 
				
			||||||
 | 
								t.Fatalf("Expected request.HopCount 1 got %v", request.HopCount)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// fetch should trigger a request, if it doesn't happen in time, test should fail
 | 
							// fetch should trigger a request, if it doesn't happen in time, test should fail
 | 
				
			||||||
	case <-time.After(200 * time.Millisecond):
 | 
						case <-time.After(200 * time.Millisecond):
 | 
				
			||||||
		t.Fatalf("fetch timeout")
 | 
							t.Fatalf("fetch timeout")
 | 
				
			||||||
@@ -123,7 +128,7 @@ func TestFetcherCancelStopsFetcher(t *testing.T) {
 | 
				
			|||||||
	rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
 | 
						rctx, rcancel := context.WithTimeout(ctx, 100*time.Millisecond)
 | 
				
			||||||
	defer rcancel()
 | 
						defer rcancel()
 | 
				
			||||||
	// we call Request with an active context
 | 
						// we call Request with an active context
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 | 
						// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
@@ -151,7 +156,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
 | 
				
			|||||||
	rcancel()
 | 
						rcancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// we call Request with a cancelled context
 | 
						// we call Request with a cancelled context
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 | 
						// fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
@@ -162,7 +167,7 @@ func TestFetcherCancelStopsRequest(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
 | 
						// if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
 | 
				
			||||||
	rctx = context.Background()
 | 
						rctx = context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case <-requester.requestC:
 | 
						case <-requester.requestC:
 | 
				
			||||||
@@ -200,7 +205,7 @@ func TestFetcherOfferUsesSource(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// call Request after the Offer
 | 
						// call Request after the Offer
 | 
				
			||||||
	rctx = context.Background()
 | 
						rctx = context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// there should be exactly 1 request coming from fetcher
 | 
						// there should be exactly 1 request coming from fetcher
 | 
				
			||||||
	var request *Request
 | 
						var request *Request
 | 
				
			||||||
@@ -241,7 +246,7 @@ func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// call Request first
 | 
						// call Request first
 | 
				
			||||||
	rctx := context.Background()
 | 
						rctx := context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// there should be a request coming from fetcher
 | 
						// there should be a request coming from fetcher
 | 
				
			||||||
	var request *Request
 | 
						var request *Request
 | 
				
			||||||
@@ -296,7 +301,7 @@ func TestFetcherRetryOnTimeout(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// call the fetch function with an active context
 | 
						// call the fetch function with an active context
 | 
				
			||||||
	rctx := context.Background()
 | 
						rctx := context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// after 100ms the first request should be initiated
 | 
						// after 100ms the first request should be initiated
 | 
				
			||||||
	time.Sleep(100 * time.Millisecond)
 | 
						time.Sleep(100 * time.Millisecond)
 | 
				
			||||||
@@ -338,7 +343,7 @@ func TestFetcherFactory(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
 | 
						fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	fetcher.Request(context.Background())
 | 
						fetcher.Request(context.Background(), 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// check if the created fetchFunction really starts a fetcher and initiates a request
 | 
						// check if the created fetchFunction really starts a fetcher and initiates a request
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
@@ -368,7 +373,7 @@ func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
 | 
				
			|||||||
	go fetcher.run(ctx, peersToSkip)
 | 
						go fetcher.run(ctx, peersToSkip)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	rctx := context.Background()
 | 
						rctx := context.Background()
 | 
				
			||||||
	fetcher.Request(rctx)
 | 
						fetcher.Request(rctx, 0)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	select {
 | 
						select {
 | 
				
			||||||
	case <-requester.requestC:
 | 
						case <-requester.requestC:
 | 
				
			||||||
@@ -457,3 +462,26 @@ func TestRequestSkipPeerPermanent(t *testing.T) {
 | 
				
			|||||||
		t.Errorf("peer not skipped")
 | 
							t.Errorf("peer not skipped")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func TestFetcherMaxHopCount(t *testing.T) {
 | 
				
			||||||
 | 
						requester := newMockRequester()
 | 
				
			||||||
 | 
						addr := make([]byte, 32)
 | 
				
			||||||
 | 
						fetcher := NewFetcher(addr, requester.doRequest, true)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ctx, cancel := context.WithCancel(context.Background())
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						peersToSkip := &sync.Map{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						go fetcher.run(ctx, peersToSkip)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						rctx := context.Background()
 | 
				
			||||||
 | 
						fetcher.Request(rctx, maxHopCount)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// if hopCount is already at max no request should be initiated
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case <-requester.requestC:
 | 
				
			||||||
 | 
							t.Fatalf("cancelled fetcher initiated request")
 | 
				
			||||||
 | 
						case <-time.After(200 * time.Millisecond):
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -128,6 +128,7 @@ func (s *SwarmChunkServer) GetData(ctx context.Context, key []byte) ([]byte, err
 | 
				
			|||||||
type RetrieveRequestMsg struct {
 | 
					type RetrieveRequestMsg struct {
 | 
				
			||||||
	Addr      storage.Address
 | 
						Addr      storage.Address
 | 
				
			||||||
	SkipCheck bool
 | 
						SkipCheck bool
 | 
				
			||||||
 | 
						HopCount  uint8
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
 | 
					func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
 | 
				
			||||||
@@ -148,7 +149,9 @@ func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	var cancel func()
 | 
						var cancel func()
 | 
				
			||||||
	// TODO: do something with this hardcoded timeout, maybe use TTL in the future
 | 
						// TODO: do something with this hardcoded timeout, maybe use TTL in the future
 | 
				
			||||||
	ctx, cancel = context.WithTimeout(context.WithValue(ctx, "peer", sp.ID().String()), network.RequestTimeout)
 | 
						ctx = context.WithValue(ctx, "peer", sp.ID().String())
 | 
				
			||||||
 | 
						ctx = context.WithValue(ctx, "hopcount", req.HopCount)
 | 
				
			||||||
 | 
						ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go func() {
 | 
						go func() {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
@@ -247,6 +250,7 @@ func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (
 | 
				
			|||||||
	err := sp.SendPriority(ctx, &RetrieveRequestMsg{
 | 
						err := sp.SendPriority(ctx, &RetrieveRequestMsg{
 | 
				
			||||||
		Addr:      req.Addr,
 | 
							Addr:      req.Addr,
 | 
				
			||||||
		SkipCheck: req.SkipCheck,
 | 
							SkipCheck: req.SkipCheck,
 | 
				
			||||||
 | 
							HopCount:  req.HopCount,
 | 
				
			||||||
	}, Top)
 | 
						}, Top)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return nil, nil, err
 | 
							return nil, nil, err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -639,7 +639,7 @@ func (c *clientParams) clientCreated() {
 | 
				
			|||||||
// Spec is the spec of the streamer protocol
 | 
					// Spec is the spec of the streamer protocol
 | 
				
			||||||
var Spec = &protocols.Spec{
 | 
					var Spec = &protocols.Spec{
 | 
				
			||||||
	Name:       "stream",
 | 
						Name:       "stream",
 | 
				
			||||||
	Version:    6,
 | 
						Version:    7,
 | 
				
			||||||
	MaxMsgSize: 10 * 1024 * 1024,
 | 
						MaxMsgSize: 10 * 1024 * 1024,
 | 
				
			||||||
	Messages: []interface{}{
 | 
						Messages: []interface{}{
 | 
				
			||||||
		UnsubscribeMsg{},
 | 
							UnsubscribeMsg{},
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,7 +40,7 @@ func (t *TestHandler) Close() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
type mockNetFetcher struct{}
 | 
					type mockNetFetcher struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *mockNetFetcher) Request(ctx context.Context) {
 | 
					func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
					func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -34,7 +34,7 @@ type (
 | 
				
			|||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type NetFetcher interface {
 | 
					type NetFetcher interface {
 | 
				
			||||||
	Request(ctx context.Context)
 | 
						Request(ctx context.Context, hopCount uint8)
 | 
				
			||||||
	Offer(ctx context.Context, source *enode.ID)
 | 
						Offer(ctx context.Context, source *enode.ID)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -263,6 +263,9 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// If there is a source in the context then it is an offer, otherwise a request
 | 
						// If there is a source in the context then it is an offer, otherwise a request
 | 
				
			||||||
	sourceIF := rctx.Value("source")
 | 
						sourceIF := rctx.Value("source")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						hopCount, _ := rctx.Value("hopcount").(uint8)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if sourceIF != nil {
 | 
						if sourceIF != nil {
 | 
				
			||||||
		var source enode.ID
 | 
							var source enode.ID
 | 
				
			||||||
		if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
 | 
							if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
 | 
				
			||||||
@@ -270,7 +273,7 @@ func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		f.netFetcher.Offer(rctx, &source)
 | 
							f.netFetcher.Offer(rctx, &source)
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		f.netFetcher.Request(rctx)
 | 
							f.netFetcher.Request(rctx, hopCount)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// wait until either the chunk is delivered or the context is done
 | 
						// wait until either the chunk is delivered or the context is done
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -40,6 +40,7 @@ type mockNetFetcher struct {
 | 
				
			|||||||
	offerCalled     bool
 | 
						offerCalled     bool
 | 
				
			||||||
	quit            <-chan struct{}
 | 
						quit            <-chan struct{}
 | 
				
			||||||
	ctx             context.Context
 | 
						ctx             context.Context
 | 
				
			||||||
 | 
						hopCounts       []uint8
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
					func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
				
			||||||
@@ -47,7 +48,7 @@ func (m *mockNetFetcher) Offer(ctx context.Context, source *enode.ID) {
 | 
				
			|||||||
	m.sources = append(m.sources, source)
 | 
						m.sources = append(m.sources, source)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (m *mockNetFetcher) Request(ctx context.Context) {
 | 
					func (m *mockNetFetcher) Request(ctx context.Context, hopCount uint8) {
 | 
				
			||||||
	m.requestCalled = true
 | 
						m.requestCalled = true
 | 
				
			||||||
	var peers []Address
 | 
						var peers []Address
 | 
				
			||||||
	m.peers.Range(func(key interface{}, _ interface{}) bool {
 | 
						m.peers.Range(func(key interface{}, _ interface{}) bool {
 | 
				
			||||||
@@ -55,6 +56,7 @@ func (m *mockNetFetcher) Request(ctx context.Context) {
 | 
				
			|||||||
		return true
 | 
							return true
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
	m.peersPerRequest = append(m.peersPerRequest, peers)
 | 
						m.peersPerRequest = append(m.peersPerRequest, peers)
 | 
				
			||||||
 | 
						m.hopCounts = append(m.hopCounts, hopCount)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type mockNetFetchFuncFactory struct {
 | 
					type mockNetFetchFuncFactory struct {
 | 
				
			||||||
@@ -412,7 +414,8 @@ func TestNetStoreGetCallsRequest(t *testing.T) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	chunk := GenerateRandomChunk(ch.DefaultSize)
 | 
						chunk := GenerateRandomChunk(ch.DefaultSize)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
 | 
						ctx := context.WithValue(context.Background(), "hopcount", uint8(5))
 | 
				
			||||||
 | 
						ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
 | 
				
			||||||
	defer cancel()
 | 
						defer cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// We call get for a not available chunk, it will timeout because the chunk is not delivered
 | 
						// We call get for a not available chunk, it will timeout because the chunk is not delivered
 | 
				
			||||||
@@ -426,6 +429,10 @@ func TestNetStoreGetCallsRequest(t *testing.T) {
 | 
				
			|||||||
	if !fetcher.requestCalled {
 | 
						if !fetcher.requestCalled {
 | 
				
			||||||
		t.Fatal("Expected NetFetcher.Request to be called")
 | 
							t.Fatal("Expected NetFetcher.Request to be called")
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						if fetcher.hopCounts[0] != 5 {
 | 
				
			||||||
 | 
							t.Fatalf("Expected NetFetcher.Request be called with hopCount 5, got %v", fetcher.hopCounts[0])
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
 | 
					// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user