diff --git a/network/fetcher.go b/network/fetcher.go
new file mode 100644
index 0000000000..0908c31efd
--- /dev/null
+++ b/network/fetcher.go
@@ -0,0 +1,336 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package network
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/tracing"
+ olog "github.com/opentracing/opentracing-go/log"
+)
+
+const (
+ defaultSearchTimeout = 1 * time.Second
+ // maximum number of forwarded requests (hops), to make sure requests are not
+ // forwarded forever in peer loops
+ maxHopCount uint8 = 20
+)
+
+// Time to consider peer to be skipped.
+// Also used in stream delivery.
+var RequestTimeout = 10 * time.Second
+
+type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error)
+
+// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and
+// keeps it alive until all active requests are completed. This can happen:
+// 1. either because the chunk is delivered
+// 2. or because the requester cancelled/timed out
+// Fetcher self destroys itself after it is completed.
+// TODO: cancel all forward requests after termination
+type Fetcher struct {
+ protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk
+ addr storage.Address // the address of the chunk to be fetched
+ offerC chan *enode.ID // channel of sources (peer node id strings)
+ requestC chan uint8 // channel for incoming requests (with the hopCount value in it)
+ searchTimeout time.Duration
+ skipCheck bool
+ ctx context.Context
+}
+
+type Request struct {
+ Addr storage.Address // chunk address
+ Source *enode.ID // nodeID of peer to request from (can be nil)
+ SkipCheck bool // whether to offer the chunk first or deliver directly
+ peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil)
+ HopCount uint8 // number of forwarded requests (hops)
+}
+
+// NewRequest returns a new instance of Request based on chunk address skip check and
+// a map of peers to skip.
+func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request {
+ return &Request{
+ Addr: addr,
+ SkipCheck: skipCheck,
+ peersToSkip: peersToSkip,
+ }
+}
+
+// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
+// Peers to skip are kept per Request and for a time period of RequestTimeout.
+// This function is used in stream package in Delivery.RequestFromPeers to optimize
+// requests for chunks.
+func (r *Request) SkipPeer(nodeID string) bool {
+ val, ok := r.peersToSkip.Load(nodeID)
+ if !ok {
+ return false
+ }
+ t, ok := val.(time.Time)
+ if ok && time.Now().After(t.Add(RequestTimeout)) {
+ // deadline expired
+ r.peersToSkip.Delete(nodeID)
+ return false
+ }
+ return true
+}
+
+// FetcherFactory is initialised with a request function and can create fetchers
+type FetcherFactory struct {
+ request RequestFunc
+ skipCheck bool
+}
+
+// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory
+func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory {
+ return &FetcherFactory{
+ request: request,
+ skipCheck: skipCheck,
+ }
+}
+
+// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip
+// are not requested to deliver the given chunk. peersToSkip should always
+// contain the peers which are actively requesting this chunk, to make sure we
+// don't request back the chunks from them.
+// The created Fetcher is started and returned.
+func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher {
+ fetcher := NewFetcher(ctx, source, f.request, f.skipCheck)
+ go fetcher.run(peers)
+ return fetcher
+}
+
+// NewFetcher creates a new Fetcher for the given chunk address using the given request function.
+func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher {
+ return &Fetcher{
+ addr: addr,
+ protoRequestFunc: rf,
+ offerC: make(chan *enode.ID),
+ requestC: make(chan uint8),
+ searchTimeout: defaultSearchTimeout,
+ skipCheck: skipCheck,
+ ctx: ctx,
+ }
+}
+
+// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally.
+func (f *Fetcher) Offer(source *enode.ID) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-f.ctx.Done():
+ return
+ default:
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.offerC <- source:
+ case <-f.ctx.Done():
+ }
+}
+
+// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally.
+func (f *Fetcher) Request(hopCount uint8) {
+ // First we need to have this select to make sure that we return if context is done
+ select {
+ case <-f.ctx.Done():
+ return
+ default:
+ }
+
+ if hopCount >= maxHopCount {
+ log.Debug("fetcher request hop count limit reached", "hops", hopCount)
+ return
+ }
+
+ // This select alone would not guarantee that we return of context is done, it could potentially
+ // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements)
+ select {
+ case f.requestC <- hopCount + 1:
+ case <-f.ctx.Done():
+ }
+}
+
+// start prepares the Fetcher
+// it keeps the Fetcher alive within the lifecycle of the passed context
+func (f *Fetcher) run(peers *sync.Map) {
+ var (
+ doRequest bool // determines if retrieval is initiated in the current iteration
+ wait *time.Timer // timer for search timeout
+ waitC <-chan time.Time // timer channel
+ sources []*enode.ID // known sources, ie. peers that offered the chunk
+ requested bool // true if the chunk was actually requested
+ hopCount uint8
+ )
+ gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected
+
+ // loop that keeps the fetching process alive
+ // after every request a timer is set. If this goes off we request again from another peer
+ // note that the previous request is still alive and has the chance to deliver, so
+ // requesting again extends the search. ie.,
+ // if a peer we requested from is gone we issue a new request, so the number of active
+ // requests never decreases
+ for {
+ select {
+
+ // incoming offer
+ case source := <-f.offerC:
+ log.Trace("new source", "peer addr", source, "request addr", f.addr)
+ // 1) the chunk is offered by a syncing peer
+ // add to known sources
+ sources = append(sources, source)
+ // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer)
+ doRequest = requested
+
+ // incoming request
+ case hopCount = <-f.requestC:
+ // 2) chunk is requested, set requested flag
+ // launch a request iff none been launched yet
+ doRequest = !requested
+ log.Trace("new request", "request addr", f.addr, "doRequest", doRequest)
+ requested = true
+
+ // peer we requested from is gone. fall back to another
+ // and remove the peer from the peers map
+ case id := <-gone:
+ peers.Delete(id.String())
+ doRequest = requested
+ log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest)
+
+ // search timeout: too much time passed since the last request,
+ // extend the search to a new peer if we can find one
+ case <-waitC:
+ doRequest = requested
+ log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest)
+
+ // all Fetcher context closed, can quit
+ case <-f.ctx.Done():
+ log.Trace("terminate fetcher", "request addr", f.addr)
+ // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from)
+ return
+ }
+
+ // need to issue a new request
+ if doRequest {
+ var err error
+ sources, err = f.doRequest(gone, peers, sources, hopCount)
+ if err != nil {
+ log.Info("unable to request", "request addr", f.addr, "err", err)
+ }
+ }
+
+ // if wait channel is not set, set it to a timer
+ if requested {
+ if wait == nil {
+ wait = time.NewTimer(f.searchTimeout)
+ defer wait.Stop()
+ waitC = wait.C
+ } else {
+ // stop the timer and drain the channel if it was not drained earlier
+ if !wait.Stop() {
+ select {
+ case <-wait.C:
+ default:
+ }
+ }
+ // reset the timer to go off after defaultSearchTimeout
+ wait.Reset(f.searchTimeout)
+ }
+ }
+ doRequest = false
+ }
+}
+
+// doRequest attempts at finding a peer to request the chunk from
+// * first it tries to request explicitly from peers that are known to have offered the chunk
+// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address
+// excluding those in the peersToSkip map
+// * if no such peer is found an error is returned
+//
+// if a request is successful,
+// * the peer's address is added to the set of peers to skip
+// * the peer's address is removed from prospective sources, and
+// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer)
+func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) {
+ var i int
+ var sourceID *enode.ID
+ var quit chan struct{}
+
+ req := &Request{
+ Addr: f.addr,
+ SkipCheck: f.skipCheck,
+ peersToSkip: peersToSkip,
+ HopCount: hopCount,
+ }
+
+ foundSource := false
+ // iterate over known sources
+ for i = 0; i < len(sources); i++ {
+ req.Source = sources[i]
+ var err error
+ log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String())
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
+ if err == nil {
+ // remove the peer from known sources
+ // Note: we can modify the source although we are looping on it, because we break from the loop immediately
+ sources = append(sources[:i], sources[i+1:]...)
+ foundSource = true
+ break
+ }
+ }
+
+ // if there are no known sources, or none available, we try request from a closest node
+ if !foundSource {
+ req.Source = nil
+ var err error
+ sourceID, quit, err = f.protoRequestFunc(f.ctx, req)
+ if err != nil {
+ // if no peers found to request from
+ return sources, err
+ }
+ }
+ // add peer to the set of peers to skip from now
+ peersToSkip.Store(sourceID.String(), time.Now())
+
+ // if the quit channel is closed, it indicates that the source peer we requested from
+ // disconnected or terminated its streamer
+ // here start a go routine that watches this channel and reports the source peer on the gone channel
+ // this go routine quits if the fetcher global context is done to prevent process leak
+ go func() {
+ select {
+ case <-quit:
+ gone <- sourceID
+ case <-f.ctx.Done():
+ }
+
+ // finish the request span
+ spanId := fmt.Sprintf("stream.send.request.%v.%v", *sourceID, req.Addr)
+ span := tracing.ShiftSpanByKey(spanId)
+
+ if span != nil {
+ span.LogFields(olog.String("finish", "from doRequest"))
+ span.Finish()
+ }
+ }()
+ return sources, nil
+}
diff --git a/network/fetcher_test.go b/network/fetcher_test.go
new file mode 100644
index 0000000000..4e464f10f3
--- /dev/null
+++ b/network/fetcher_test.go
@@ -0,0 +1,476 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package network
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/p2p/enode"
+)
+
+var requestedPeerID = enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9")
+
+// mockRequester pushes every request to the requestC channel when its doRequest function is called
+type mockRequester struct {
+ // requests []Request
+ requestC chan *Request // when a request is coming it is pushed to requestC
+ waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional)
+ count int //counts the number of requests
+ quitC chan struct{}
+}
+
+func newMockRequester(waitTimes ...time.Duration) *mockRequester {
+ return &mockRequester{
+ requestC: make(chan *Request),
+ waitTimes: waitTimes,
+ quitC: make(chan struct{}),
+ }
+}
+
+func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode.ID, chan struct{}, error) {
+ waitTime := time.Duration(0)
+ if m.count < len(m.waitTimes) {
+ waitTime = m.waitTimes[m.count]
+ m.count++
+ }
+ time.Sleep(waitTime)
+ m.requestC <- request
+
+ // if there is a Source in the request use that, if not use the global requestedPeerId
+ source := request.Source
+ if source == nil {
+ source = &requestedPeerID
+ }
+ return source, m.quitC, nil
+}
+
+// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip.
+// mockRequester pushes a Request on a channel every time the request function is called. Using
+// this channel we test if calling Fetcher.Request calls the request function, and whether it uses
+// the correct peers to skip which we provided for the fetcher.run function.
+func TestFetcherSingleRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peers := []string{"a", "b", "c", "d"}
+ peersToSkip := &sync.Map{}
+ for _, p := range peers {
+ peersToSkip.Store(p, time.Now())
+ }
+
+ go fetcher.run(peersToSkip)
+
+ fetcher.Request(0)
+
+ select {
+ case request := <-requester.requestC:
+ // request should contain all peers from peersToSkip provided to the fetcher
+ for _, p := range peers {
+ if _, ok := request.peersToSkip.Load(p); !ok {
+ t.Fatalf("request.peersToSkip misses peer")
+ }
+ }
+
+ // source peer should be also added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok {
+ t.Fatalf("request.peersToSkip does not contain peer returned by the request function")
+ }
+
+ // hopCount in the forwarded request should be incremented
+ if request.HopCount != 1 {
+ t.Fatalf("Expected request.HopCount 1 got %v", request.HopCount)
+ }
+
+ // fetch should trigger a request, if it doesn't happen in time, test should fail
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+}
+
+// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called
+func TestFetcherCancelStopsFetcher(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // we start the fetcher, and then we immediately cancel the context
+ go fetcher.run(peersToSkip)
+ cancel()
+
+ // we call Request with an active context
+ fetcher.Request(0)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+}
+
+// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request
+func TestFetcherCancelStopsRequest(t *testing.T) {
+ t.Skip("since context is now per fetcher, this test is likely redundant")
+
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // we start the fetcher with an active context
+ go fetcher.run(peersToSkip)
+
+ // we call Request with a cancelled context
+ fetcher.Request(0)
+
+ // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetch function initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled
+ fetcher.Request(0)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("expected request")
+ }
+}
+
+// TestOfferUsesSource tests Fetcher Offer behavior.
+// In this case there should be 1 (and only one) request initiated from the source peer, and the
+// source nodeid should appear in the peersToSkip map.
+func TestFetcherOfferUsesSource(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // start the fetcher
+ go fetcher.run(peersToSkip)
+
+ // call the Offer function with the source peer
+ fetcher.Offer(&sourcePeerID)
+
+ // fetcher should not initiate request
+ select {
+ case <-requester.requestC:
+ t.Fatalf("fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // call Request after the Offer
+ fetcher.Request(0)
+
+ // there should be exactly 1 request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("Fetcher number of requests expected 1 got 2")
+ case <-time.After(200 * time.Millisecond):
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ // start the fetcher
+ go fetcher.run(peersToSkip)
+
+ // call Request first
+ fetcher.Request(0)
+
+ // there should be a request coming from fetcher
+ var request *Request
+ select {
+ case request = <-requester.requestC:
+ if request.Source != nil {
+ t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // after the Request call Offer
+ fetcher.Offer(&sourcePeerID)
+
+ // there should be a request coming from fetcher
+ select {
+ case request = <-requester.requestC:
+ if *request.Source != sourcePeerID {
+ t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source)
+ }
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetcher did not initiate request")
+ }
+
+ // source peer should be added to peersToSkip eventually
+ time.Sleep(100 * time.Millisecond)
+ if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok {
+ t.Fatalf("SourcePeerId not added to peersToSkip")
+ }
+}
+
+// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed
+func TestFetcherRetryOnTimeout(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+ // set searchTimeOut to low value so the test is quicker
+ fetcher.searchTimeout = 250 * time.Millisecond
+
+ peersToSkip := &sync.Map{}
+
+ // start the fetcher
+ go fetcher.run(peersToSkip)
+
+ // call the fetch function with an active context
+ fetcher.Request(0)
+
+ // after 100ms the first request should be initiated
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not initiate request")
+ }
+
+ // after another 100ms no new request should be initiated, because search timeout is 250ms
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ t.Fatalf("unexpected request from fetcher")
+ default:
+ }
+
+ // after another 300ms search timeout is over, there should be a new request
+ time.Sleep(300 * time.Millisecond)
+
+ select {
+ case <-requester.requestC:
+ default:
+ t.Fatalf("fetch did not retry request")
+ }
+}
+
+// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts
+// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if
+// a request is initiated when the fetch function is called
+func TestFetcherFactory(t *testing.T) {
+ requester := newMockRequester(100 * time.Millisecond)
+ addr := make([]byte, 32)
+ fetcherFactory := NewFetcherFactory(requester.doRequest, false)
+
+ peersToSkip := &sync.Map{}
+
+ fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip)
+
+ fetcher.Request(0)
+
+ // check if the created fetchFunction really starts a fetcher and initiates a request
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("fetch timeout")
+ }
+
+}
+
+func TestFetcherRequestQuitRetriesRequest(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ // make sure the searchTimeout is long so it is sure the request is not
+ // retried because of timeout
+ fetcher.searchTimeout = 10 * time.Second
+
+ peersToSkip := &sync.Map{}
+
+ go fetcher.run(peersToSkip)
+
+ fetcher.Request(0)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated")
+ }
+
+ close(requester.quitC)
+
+ select {
+ case <-requester.requestC:
+ case <-time.After(200 * time.Millisecond):
+ t.Fatalf("request is not initiated after failed request")
+ }
+}
+
+// TestRequestSkipPeer checks if PeerSkip function will skip provided peer
+// and not skip unknown one.
+func TestRequestSkipPeer(t *testing.T) {
+ addr := make([]byte, 32)
+ peers := []enode.ID{
+ enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8"),
+ enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9"),
+ }
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peers[0].String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peers[0].String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ if r.SkipPeer(peers[1].String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerExpired checks if a peer to skip is not skipped
+// after RequestTimeout has passed.
+func TestRequestSkipPeerExpired(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), time.Now())
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if r.SkipPeer(peer.String()) {
+ t.Errorf("peer skipped")
+ }
+}
+
+// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped
+// after RequestTimeout is not skipped if it is set for a permanent skipping
+// by value to peersToSkip map is not time.Duration.
+func TestRequestSkipPeerPermanent(t *testing.T) {
+ addr := make([]byte, 32)
+ peer := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8")
+
+ // set RequestTimeout to a low value and reset it after the test
+ defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout)
+ RequestTimeout = 250 * time.Millisecond
+
+ peersToSkip := new(sync.Map)
+ peersToSkip.Store(peer.String(), true)
+ r := NewRequest(addr, false, peersToSkip)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+
+ time.Sleep(500 * time.Millisecond)
+
+ if !r.SkipPeer(peer.String()) {
+ t.Errorf("peer not skipped")
+ }
+}
+
+func TestFetcherMaxHopCount(t *testing.T) {
+ requester := newMockRequester()
+ addr := make([]byte, 32)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fetcher := NewFetcher(ctx, addr, requester.doRequest, true)
+
+ peersToSkip := &sync.Map{}
+
+ go fetcher.run(peersToSkip)
+
+ // if hopCount is already at max no request should be initiated
+ select {
+ case <-requester.requestC:
+ t.Fatalf("cancelled fetcher initiated request")
+ case <-time.After(200 * time.Millisecond):
+ }
+}
diff --git a/network/stream/common_test.go b/network/stream/common_test.go
index 739ff548fb..6d7d7d68e5 100644
--- a/network/stream/common_test.go
+++ b/network/stream/common_test.go
@@ -56,6 +56,7 @@ var (
bucketKeyStore = simulation.BucketKey("store")
bucketKeyFileStore = simulation.BucketKey("filestore")
+ bucketKeyNetStore = simulation.BucketKey("netstore")
bucketKeyDelivery = simulation.BucketKey("delivery")
bucketKeyRegistry = simulation.BucketKey("registry")
@@ -80,7 +81,7 @@ func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*ne
return nil, nil, nil, nil, err
}
- netStore.RemoteGet = delivery.RequestFromPeers
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
return addr, netStore, delivery, cleanup, nil
}
@@ -92,13 +93,13 @@ func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *syn
return nil, nil, nil, err
}
- netStore.RemoteGet = delivery.RequestFromPeers
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
return netStore, delivery, cleanup, nil
}
// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
-func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf storage.RemoteGetFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
+func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
@@ -106,7 +107,7 @@ func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket
return nil, nil, nil, nil, err
}
- netStore.RemoteGet = rf
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
return addr, netStore, delivery, cleanup, nil
}
@@ -119,9 +120,14 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map,
return nil, nil, nil, err
}
- netStore := storage.NewNetStore(localStore, enode.ID{})
- lnetStore := storage.NewLNetStore(netStore)
- fileStore := storage.NewFileStore(lnetStore, storage.NewFileStoreParams(), chunk.NewTags())
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ localStore.Close()
+ localStoreCleanup()
+ return nil, nil, nil, err
+ }
+
+ fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags())
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
@@ -161,11 +167,15 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
return nil, nil, nil, nil, err
}
- netStore := storage.NewNetStore(localStore, enode.ID{})
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ localStore.Close()
+ removeDataDir()
+ return nil, nil, nil, nil, err
+ }
delivery := NewDelivery(to, netStore)
- netStore.RemoteGet = delivery.RequestFromPeers
-
+ netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
intervalsStore := state.NewInmemoryStore()
streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil)
diff --git a/network/stream/delivery.go b/network/stream/delivery.go
index 794f9c9f03..43740a0514 100644
--- a/network/stream/delivery.go
+++ b/network/stream/delivery.go
@@ -27,9 +27,9 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
- "github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)
@@ -39,6 +39,9 @@ var (
handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil)
retrieveChunkFail = metrics.NewRegisteredCounter("network.stream.retrieve_chunks_fail.count", nil)
+ requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil)
+ requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil)
+
lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)
)
@@ -59,42 +62,52 @@ func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type RetrieveRequestMsg struct {
- Addr storage.Address
+ Addr storage.Address
+ SkipCheck bool
+ HopCount uint8
}
func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
- log.Trace("handle retrieve request", "peer", sp.ID(), "hash", req.Addr)
+ log.Trace("received request", "peer", sp.ID(), "hash", req.Addr)
handleRetrieveRequestMsgCount.Inc(1)
- ctx, osp := spancontext.StartSpan(
+ var osp opentracing.Span
+ ctx, osp = spancontext.StartSpan(
ctx,
- "handle.retrieve.request")
+ "stream.handle.retrieve")
osp.LogFields(olog.String("ref", req.Addr.String()))
- defer osp.Finish()
+ var cancel func()
+ // TODO: do something with this hardcoded timeout, maybe use TTL in the future
+ ctx = context.WithValue(ctx, "peer", sp.ID().String())
+ ctx = context.WithValue(ctx, "hopcount", req.HopCount)
+ ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)
- ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
- defer cancel()
+ go func() {
+ select {
+ case <-ctx.Done():
+ case <-d.quit:
+ }
+ cancel()
+ }()
- r := &storage.Request{
- Addr: req.Addr,
- Origin: sp.ID(),
- }
- chunk, err := d.netStore.Get(ctx, chunk.ModeGetRequest, r)
- if err != nil {
- retrieveChunkFail.Inc(1)
- log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "err", err)
- return nil
- }
+ go func() {
+ defer osp.Finish()
+ ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
+ if err != nil {
+ retrieveChunkFail.Inc(1)
+ log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
+ return
+ }
+ syncing := false
- log.Trace("retrieve request, delivery", "ref", req.Addr, "peer", sp.ID())
- syncing := false
- err = sp.Deliver(ctx, chunk, 0, syncing)
- if err != nil {
- log.Error("sp.Deliver errored", "err", err)
- }
- osp.LogFields(olog.Bool("delivered", true))
+ err = sp.Deliver(ctx, ch, Top, syncing)
+ if err != nil {
+ log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
+ }
+ osp.LogFields(olog.Bool("delivered", true))
+ }()
return nil
}
@@ -176,166 +189,57 @@ func (d *Delivery) Close() {
close(d.quit)
}
-// getOriginPo returns the originPo if the incoming Request has an Origin
-// if our node is the first node that requests this chunk, then we don't have an Origin,
-// and return -1
-// this is used only for tracing, and can probably be refactor so that we don't have to
-// iterater over Kademlia
-func (d *Delivery) getOriginPo(req *storage.Request) int {
- originPo := -1
-
- d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
- id := p.ID()
-
- // get po between chunk and origin
- if req.Origin.String() == id.String() {
- originPo = po
- return false
- }
-
- return true
- })
-
- return originPo
-}
-
-// FindPeer is returning the closest peer from Kademlia that a chunk
-// request hasn't already been sent to
-func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, error) {
+// RequestFromPeers sends a chunk retrieve request to a peer
+// The most eligible peer that hasn't already been sent to is chosen
+// TODO: define "eligible"
+func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
+ requestFromPeersCount.Inc(1)
var sp *Peer
- var err error
+ spID := req.Source
- osp, _ := ctx.Value("remote.fetch").(opentracing.Span)
-
- // originPo - proximity of the node that made the request; -1 if the request originator is our node;
- // myPo - this node's proximity with the requested chunk
- // selectedPeerPo - kademlia suggested node's proximity with the requested chunk (computed further below)
- originPo := d.getOriginPo(req)
- myPo := chunk.Proximity(req.Addr, d.kad.BaseAddr())
- selectedPeerPo := -1
-
- depth := d.kad.NeighbourhoodDepth()
-
- if osp != nil {
- osp.LogFields(olog.Int("originPo", originPo))
- osp.LogFields(olog.Int("depth", depth))
- osp.LogFields(olog.Int("myPo", myPo))
- }
-
- // do not forward requests if origin proximity is bigger than our node's proximity
- // this means that origin is closer to the chunk
- if originPo > myPo {
- return nil, errors.New("not forwarding request, origin node is closer to chunk than this node")
- }
-
- d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
- id := p.ID()
-
- // skip light nodes
- if p.LightNode {
- return true
+ if spID != nil {
+ sp = d.getPeer(*spID)
+ if sp == nil {
+ return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
}
-
- // do not send request back to peer who asked us. maybe merge with SkipPeer at some point
- if req.Origin.String() == id.String() {
- return true
- }
-
- // skip peers that we have already tried
- if req.SkipPeer(id.String()) {
- log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String())
- return true
- }
-
- if myPo < depth { // chunk is NOT within the neighbourhood
- if po <= myPo { // always choose a peer strictly closer to chunk than us
- log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
- return false
- } else {
- log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ } else {
+ d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
+ id := p.ID()
+ if p.LightNode {
+ // skip light nodes
+ return true
}
- } else { // chunk IS WITHIN neighbourhood
- if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us
- log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
- return false
- } else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood
- log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
- return false
- } else {
- log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
+ if req.SkipPeer(id.String()) {
+ log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
+ return true
}
- }
-
- // if selected peer is not in the depth (2nd condition; if depth <= po, then peer is in nearest neighbourhood)
- // and they have a lower po than ours, return error
- if po < myPo && depth > po {
- log.Trace("findpeer4 skip peer because origin was closer", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
-
- err = fmt.Errorf("not asking peers further away from origin; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
+ sp = d.getPeer(id)
+ // sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
+ if sp == nil {
+ return true
+ }
+ spID = &id
return false
+ })
+ if sp == nil {
+ return nil, nil, errors.New("no peer found")
}
-
- // if chunk falls in our nearest neighbourhood (1st condition), but suggested peer is not in
- // the nearest neighbourhood (2nd condition), don't forward the request to suggested peer
- if depth <= myPo && depth > po {
- log.Trace("findpeer5 skip peer because depth", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
-
- err = fmt.Errorf("not going outside of depth; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
- return false
- }
-
- sp = d.getPeer(id)
-
- // sp could be nil, if we encountered a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
- // if sp is not nil, then we have selected the next peer and we stop iterating
- // if sp is nil, we continue iterating
- if sp != nil {
- selectedPeerPo = po
-
- return false
- }
-
- // continue iterating
- return true
- })
-
- if osp != nil {
- osp.LogFields(olog.Int("selectedPeerPo", selectedPeerPo))
- }
-
- if err != nil {
- return nil, err
- }
-
- if sp == nil {
- return nil, errors.New("no peer found")
- }
-
- return sp, nil
-}
-
-// RequestFromPeers sends a chunk retrieve request to the next found peer
-func (d *Delivery) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
- metrics.GetOrRegisterCounter("delivery.requestfrompeers", nil).Inc(1)
-
- sp, err := d.FindPeer(ctx, req)
- if err != nil {
- log.Trace(err.Error())
- return nil, err
}
// setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
// this span will finish only when delivery is handled (or times out)
- r := &RetrieveRequestMsg{
- Addr: req.Addr,
- }
- log.Trace("sending retrieve request", "ref", r.Addr, "peer", sp.ID().String(), "origin", localID)
- err = sp.Send(ctx, r)
+ ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
+ ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
+ log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
+ err := sp.SendPriority(ctx, &RetrieveRequestMsg{
+ Addr: req.Addr,
+ SkipCheck: req.SkipCheck,
+ HopCount: req.HopCount,
+ }, Top)
if err != nil {
- log.Error(err.Error())
- return nil, err
+ return nil, nil, err
}
+ requestFromPeersEachCount.Inc(1)
- spID := sp.ID()
- return &spID, nil
+ return spID, sp.quit, nil
}
diff --git a/network/stream/delivery_test.go b/network/stream/delivery_test.go
index 1e4d5a1254..3f124522ca 100644
--- a/network/stream/delivery_test.go
+++ b/network/stream/delivery_test.go
@@ -19,17 +19,26 @@ package stream
import (
"bytes"
"context"
+ "errors"
+ "fmt"
+ "sync"
"testing"
"time"
+ "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
pq "github.com/ethersphere/swarm/network/priorityqueue"
+ "github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
+ "github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
+ "github.com/ethersphere/swarm/testutil"
)
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
@@ -151,12 +160,18 @@ func TestRequestFromPeers(t *testing.T) {
streamer: r,
}
r.setPeer(sp)
- req := storage.NewRequest(storage.Address(hash0[:]))
- id, err := delivery.FindPeer(context.TODO(), req)
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ ctx := context.Background()
+ id, _, err := delivery.RequestFromPeers(ctx, req)
+
if err != nil {
t.Fatal(err)
}
- if id.ID() != dummyPeerID {
+ if *id != dummyPeerID {
t.Fatalf("Expected an id, got %v", id)
}
}
@@ -186,10 +201,15 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
}
r.setPeer(sp)
- req := storage.NewRequest(storage.Address(hash0[:]))
+ req := network.NewRequest(
+ storage.Address(hash0[:]),
+ true,
+ &sync.Map{},
+ )
+ ctx := context.Background()
// making a request which should return with "no peer found"
- _, err := delivery.FindPeer(context.TODO(), req)
+ _, _, err := delivery.RequestFromPeers(ctx, req)
expectedError := "no peer found"
if err.Error() != expectedError {
@@ -280,3 +300,293 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
}
}
+
+func TestDeliveryFromNodes(t *testing.T) {
+ testDeliveryFromNodes(t, 2, dataChunkCount, true)
+ testDeliveryFromNodes(t, 2, dataChunkCount, false)
+ testDeliveryFromNodes(t, 4, dataChunkCount, true)
+ testDeliveryFromNodes(t, 4, dataChunkCount, false)
+
+ if testutil.RaceEnabled {
+ // Travis cannot handle more nodes with -race; would time out.
+ return
+ }
+
+ testDeliveryFromNodes(t, 8, dataChunkCount, true)
+ testDeliveryFromNodes(t, 8, dataChunkCount, false)
+ testDeliveryFromNodes(t, 16, dataChunkCount, true)
+ testDeliveryFromNodes(t, 16, dataChunkCount, false)
+}
+
+func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
+ t.Helper()
+ t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ }, nil)
+ bucket.Store(bucketKeyRegistry, r)
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
+
+ log.Info("Adding nodes to simulation")
+ _, err := sim.AddNodesAndConnectChain(nodes)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ log.Info("Starting simulation")
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ nodeIDs := sim.UpNodeIDs()
+ //determine the pivot node to be the first node of the simulation
+ pivot := nodeIDs[0]
+
+ //distribute chunks of a random file into Stores of nodes 1 to nodes
+ //we will do this by creating a file store with an underlying round-robin store:
+ //the file store will create a hash for the uploaded file, but every chunk will be
+ //distributed to different nodes via round-robin scheduling
+ log.Debug("Writing file to round-robin file store")
+ //to do this, we create an array for chunkstores (length minus one, the pivot node)
+ stores := make([]storage.ChunkStore, len(nodeIDs)-1)
+ //we then need to get all stores from the sim....
+ lStores := sim.NodesItems(bucketKeyStore)
+ i := 0
+ //...iterate the buckets...
+ for id, bucketVal := range lStores {
+ //...and remove the one which is the pivot node
+ if id == pivot {
+ continue
+ }
+ //the other ones are added to the array...
+ stores[i] = bucketVal.(storage.ChunkStore)
+ i++
+ }
+ //...which then gets passed to the round-robin file store
+ roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags())
+ //now we can actually upload a (random) file to the round-robin store
+ size := chunkCount * chunkSize
+ log.Debug("Storing data to file store")
+ fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
+ // wait until all chunks stored
+ if err != nil {
+ return err
+ }
+ err = wait(ctx)
+ if err != nil {
+ return err
+ }
+
+ //get the pivot node's filestore
+ item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
+ if !ok {
+ return fmt.Errorf("No filestore")
+ }
+ pivotFileStore := item.(*storage.FileStore)
+ log.Debug("Starting retrieval routine")
+ retErrC := make(chan error)
+ go func() {
+ // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
+ // we must wait for the peer connections to have started before requesting
+ n, err := readAll(pivotFileStore, fileHash)
+ log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
+ retErrC <- err
+ }()
+
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
+ }
+ }()
+
+ //finally check that the pivot node gets all chunks via the root hash
+ log.Debug("Check retrieval")
+ success := true
+ var total int64
+ total, err = readAll(pivotFileStore, fileHash)
+ if err != nil {
+ return err
+ }
+ log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
+ if err != nil || total != int64(size) {
+ success = false
+ }
+
+ if !success {
+ return fmt.Errorf("Test failed, chunks not available on all nodes")
+ }
+ if err := <-retErrC; err != nil {
+ return fmt.Errorf("requesting chunks: %v", err)
+ }
+ log.Debug("Test terminated successfully")
+ return nil
+ })
+ if result.Error != nil {
+ t.Fatal(result.Error)
+ }
+ })
+}
+
+func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
+ for chunks := 32; chunks <= 128; chunks *= 2 {
+ for i := 2; i < 32; i *= 2 {
+ b.Run(
+ fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
+ func(b *testing.B) {
+ benchmarkDeliveryFromNodes(b, i, chunks, true)
+ },
+ )
+ }
+ }
+}
+
+func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
+ for chunks := 32; chunks <= 128; chunks *= 2 {
+ for i := 2; i < 32; i *= 2 {
+ b.Run(
+ fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
+ func(b *testing.B) {
+ benchmarkDeliveryFromNodes(b, i, chunks, false)
+ },
+ )
+ }
+ }
+}
+
+func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
+ sim := simulation.New(map[string]simulation.ServiceFunc{
+ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
+ addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
+ SkipCheck: skipCheck,
+ Syncing: SyncingDisabled,
+ SyncUpdateDelay: 0,
+ }, nil)
+ bucket.Store(bucketKeyRegistry, r)
+
+ cleanup = func() {
+ r.Close()
+ clean()
+ }
+
+ return r, cleanup, nil
+ },
+ })
+ defer sim.Close()
+
+ log.Info("Initializing test config")
+ _, err := sim.AddNodesAndConnectChain(nodes)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
+ nodeIDs := sim.UpNodeIDs()
+ node := nodeIDs[len(nodeIDs)-1]
+
+ item, ok := sim.NodeItem(node, bucketKeyFileStore)
+ if !ok {
+ return errors.New("No filestore")
+ }
+ remoteFileStore := item.(*storage.FileStore)
+
+ pivotNode := nodeIDs[0]
+ item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
+ if !ok {
+ return errors.New("No filestore")
+ }
+ netStore := item.(*storage.NetStore)
+
+ if _, err := sim.WaitTillHealthy(ctx); err != nil {
+ return err
+ }
+
+ disconnected := watchDisconnections(ctx, sim)
+ defer func() {
+ if err != nil && disconnected.bool() {
+ err = errors.New("disconnect events received")
+ }
+ }()
+ // benchmark loop
+ b.ResetTimer()
+ b.StopTimer()
+ Loop:
+ for i := 0; i < b.N; i++ {
+ // uploading chunkCount random chunks to the last node
+ hashes := make([]storage.Address, chunkCount)
+ for i := 0; i < chunkCount; i++ {
+ // create actual size real chunks
+ ctx := context.TODO()
+ hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
+ if err != nil {
+ return fmt.Errorf("store: %v", err)
+ }
+ // wait until all chunks stored
+ err = wait(ctx)
+ if err != nil {
+ return fmt.Errorf("wait store: %v", err)
+ }
+ // collect the hashes
+ hashes[i] = hash
+ }
+ // now benchmark the actual retrieval
+ // netstore.Get is called for each hash in a go routine and errors are collected
+ b.StartTimer()
+ errs := make(chan error)
+ for _, hash := range hashes {
+ go func(h storage.Address) {
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, h)
+ log.Warn("test check netstore get", "hash", h, "err", err)
+ errs <- err
+ }(hash)
+ }
+ // count and report retrieval errors
+ // if there are misses then chunk timeout is too low for the distance and volume (?)
+ var total, misses int
+ for err := range errs {
+ if err != nil {
+ log.Warn(err.Error())
+ misses++
+ }
+ total++
+ if total == chunkCount {
+ break
+ }
+ }
+ b.StopTimer()
+
+ if misses > 0 {
+ err = fmt.Errorf("%v chunk not found out of %v", misses, total)
+ break Loop
+ }
+ }
+ return err
+ })
+ if result.Error != nil {
+ b.Fatal(result.Error)
+ }
+
+}
diff --git a/network/stream/intervals_test.go b/network/stream/intervals_test.go
index 49972e54e6..2479adbf66 100644
--- a/network/stream/intervals_test.go
+++ b/network/stream/intervals_test.go
@@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/network/simulation"
- "github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
@@ -76,7 +75,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
- return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys), nil
+ return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
})
cleanup := func() {
@@ -299,42 +298,38 @@ func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
}
}
-func (c *testExternalClient) NeedData(ctx context.Context, key []byte) (bool, func(context.Context) error) {
- fi, loaded, ok := c.netStore.GetOrCreateFetcher(ctx, key, "syncer")
- if !ok {
- return loaded, nil
+func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
+ wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
+ if wait == nil {
+ return nil
}
-
select {
- case c.hashes <- key:
+ case c.hashes <- hash:
case <-ctx.Done():
log.Warn("testExternalClient NeedData context", "err", ctx.Err())
- return false, func(_ context.Context) error {
+ return func(_ context.Context) error {
return ctx.Err()
}
}
-
- return loaded, func(ctx context.Context) error {
- select {
- case <-fi.Delivered:
- case <-time.After(timeouts.SyncerClientWaitTimeout):
- return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key))
- }
- return nil
- }
+ return wait
}
func (c *testExternalClient) Close() {}
type testExternalServer struct {
t string
+ keyFunc func(key []byte, index uint64)
sessionAt uint64
maxKeys uint64
}
-func newTestExternalServer(t string, sessionAt, maxKeys uint64) *testExternalServer {
+func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
+ if keyFunc == nil {
+ keyFunc = binary.BigEndian.PutUint64
+ }
return &testExternalServer{
t: t,
+ keyFunc: keyFunc,
sessionAt: sessionAt,
maxKeys: maxKeys,
}
@@ -350,7 +345,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
- binary.BigEndian.PutUint64(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
+ s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil
}
diff --git a/network/stream/messages.go b/network/stream/messages.go
index 33320db289..cb51229b80 100644
--- a/network/stream/messages.go
+++ b/network/stream/messages.go
@@ -225,14 +225,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
- log.Trace("checking offered hash", "ref", fmt.Sprintf("%x", hash))
-
- if _, wait := c.NeedData(ctx, hash); wait != nil {
+ if wait := c.NeedData(ctx, hash); wait != nil {
ctr++
-
- // set the bit, so create a request
want.Set(i/HashSize, true)
- log.Trace("need data", "ref", fmt.Sprintf("%x", hash), "request", true)
// measure how long it takes before we mark chunks for retrieval, and actually send the request
if !wantDelaySet {
diff --git a/network/stream/snapshot_sync_test.go b/network/stream/snapshot_sync_test.go
index 39d5920273..a60d6bcfde 100644
--- a/network/stream/snapshot_sync_test.go
+++ b/network/stream/snapshot_sync_test.go
@@ -63,8 +63,8 @@ const (
// Tests in this file should not request chunks from peers.
// This function will panic indicating that there is a problem if request has been made.
-func dummyRequestFromPeers(_ context.Context, req *storage.Request, _ enode.ID) (*enode.ID, error) {
- panic(fmt.Sprintf("unexpected request: address %s", req.Addr.String()))
+func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
+ panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
}
//This test is a syncing test for nodes.
diff --git a/network/stream/stream.go b/network/stream/stream.go
index 3465acae75..6c3dbd0152 100644
--- a/network/stream/stream.go
+++ b/network/stream/stream.go
@@ -543,7 +543,7 @@ func (c *client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
- NeedData(context.Context, []byte) (bool, func(context.Context) error)
+ NeedData(context.Context, []byte) func(context.Context) error
Close()
}
diff --git a/network/stream/streamer_test.go b/network/stream/streamer_test.go
index 4e01056204..c631900539 100644
--- a/network/stream/streamer_test.go
+++ b/network/stream/streamer_test.go
@@ -93,20 +93,20 @@ func newTestClient(t string) *testClient {
}
}
-func (self *testClient) NeedData(ctx context.Context, hash []byte) (bool, func(context.Context) error) {
+func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
- return false, func(context.Context) error {
+ return func(context.Context) error {
<-self.wait0
return nil
}
} else if bytes.Equal(hash, hash2[:]) {
- return false, func(context.Context) error {
+ return func(context.Context) error {
<-self.wait2
return nil
}
}
- return false, nil
+ return nil
}
func (self *testClient) Close() {}
diff --git a/network/stream/syncer.go b/network/stream/syncer.go
index 6fc1202ad3..f73d959ce8 100644
--- a/network/stream/syncer.go
+++ b/network/stream/syncer.go
@@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
- "github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/storage"
)
@@ -74,7 +73,7 @@ func (s *SwarmSyncerServer) Close() {
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
- ch, err := s.netStore.Store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
+ ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
@@ -199,24 +198,9 @@ func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
})
}
-func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (loaded bool, wait func(context.Context) error) {
- start := time.Now()
-
- fi, loaded, ok := s.netStore.GetOrCreateFetcher(ctx, key, "syncer")
- if !ok {
- return loaded, nil
- }
-
- return loaded, func(ctx context.Context) error {
- select {
- case <-fi.Delivered:
- metrics.GetOrRegisterResettingTimer(fmt.Sprintf("fetcher.%s.syncer", fi.CreatedBy), nil).UpdateSince(start)
- case <-time.After(timeouts.SyncerClientWaitTimeout):
- metrics.GetOrRegisterCounter("fetcher.syncer.timeout", nil).Inc(1)
- return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key))
- }
- return nil
- }
+// NeedData
+func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
+ return s.netStore.FetchFunc(ctx, key)
}
func (s *SwarmSyncerClient) Close() {}
diff --git a/network/timeouts/timeouts.go b/network/timeouts/timeouts.go
deleted file mode 100644
index b92af3b47b..0000000000
--- a/network/timeouts/timeouts.go
+++ /dev/null
@@ -1,24 +0,0 @@
-package timeouts
-
-import "time"
-
-// FailedPeerSkipDelay is the time we consider a peer to be skipped for a particular request/chunk,
-// because this peer failed to deliver it during the SearchTimeout interval
-var FailedPeerSkipDelay = 20 * time.Second
-
-// FetcherGlobalTimeout is the max time a node tries to find a chunk for a client, after which it returns a 404
-// Basically this is the amount of time a singleflight request for a given chunk lives
-var FetcherGlobalTimeout = 10 * time.Second
-
-// SearchTimeout is the max time requests wait for a peer to deliver a chunk, after which another peer is tried
-var SearchTimeout = 500 * time.Millisecond
-
-// SyncerClientWaitTimeout is the max time a syncer client waits for a chunk to be delivered during syncing
-var SyncerClientWaitTimeout = 20 * time.Second
-
-// Within handleOfferedHashesMsg - how long to wait for a given batch of chunks to be delivered by the peer offering them
-var SyncBatchTimeout = 10 * time.Second
-
-// Within SwarmSyncerServer - If at least one chunk is added to the batch and no new chunks
-// are added in BatchTimeout period, the batch will be returned.
-var BatchTimeout = 2 * time.Second
diff --git a/storage/feed/handler.go b/storage/feed/handler.go
index cbaaac1819..5b4bb60507 100644
--- a/storage/feed/handler.go
+++ b/storage/feed/handler.go
@@ -192,13 +192,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error)
ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout)
defer cancel()
- r := storage.NewRequest(id.Addr())
- ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, r)
+ ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, id.Addr())
if err != nil {
- if err == context.DeadlineExceeded || err == storage.ErrNoSuitablePeer { // chunk not found
+ if err == context.DeadlineExceeded { // chunk not found
return nil, nil
}
- return nil, err
+ return nil, err //something else happened or context was cancelled.
}
var request Request
diff --git a/storage/feed/lookup/lookup.go b/storage/feed/lookup/lookup.go
index 9e9fa56588..4b233a0e07 100644
--- a/storage/feed/lookup/lookup.go
+++ b/storage/feed/lookup/lookup.go
@@ -58,8 +58,6 @@ var TimeAfter = time.After
// It should return if a value is found, but its timestamp is higher than "now"
// It should only return an error in case the handler wants to stop the
// lookup process entirely.
-// If the context is canceled, it must return context.Canceled
-
type ReadFunc func(ctx context.Context, epoch Epoch, now uint64) (interface{}, error)
// NoClue is a hint that can be provided when the Lookup caller does not have
diff --git a/storage/feed/testutil.go b/storage/feed/testutil.go
index a1e9cf030c..7e2a73f85c 100644
--- a/storage/feed/testutil.go
+++ b/storage/feed/testutil.go
@@ -18,8 +18,8 @@ package feed
import (
"context"
- "errors"
"path/filepath"
+ "sync"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/chunk"
@@ -39,6 +39,17 @@ func (t *TestHandler) Close() {
t.chunkStore.Close()
}
+type mockNetFetcher struct{}
+
+func (m *mockNetFetcher) Request(hopCount uint8) {
+}
+func (m *mockNetFetcher) Offer(source *enode.ID) {
+}
+
+func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher {
+ return &mockNetFetcher{}
+}
+
// NewTestHandler creates Handler object to be used for testing purposes.
func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) {
path := filepath.Join(datadir, testDbDirName)
@@ -51,10 +62,11 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error)
localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh)
- netStore := storage.NewNetStore(localStore, enode.ID{})
- netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
- return nil, errors.New("not found")
+ netStore, err := storage.NewNetStore(localStore, nil)
+ if err != nil {
+ return nil, err
}
+ netStore.NewNetFetcherFunc = newFakeNetFetcher
fh.SetStore(netStore)
return &TestHandler{fh}, nil
}
diff --git a/storage/filestore.go b/storage/filestore.go
index ff5ffe3970..2c2a625a18 100644
--- a/storage/filestore.go
+++ b/storage/filestore.go
@@ -39,8 +39,9 @@ implementation for storage or retrieval.
*/
const (
- defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB
- defaultCacheCapacity = 10000 // capacity for in-memory chunks' cache
+ defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB
+ defaultCacheCapacity = 10000 // capacity for in-memory chunks' cache
+ defaultChunkRequestsCacheCapacity = 5000000 // capacity for container holding outgoing requests for chunks. should be set to LevelDB capacity
)
type FileStore struct {
diff --git a/storage/lnetstore.go b/storage/lnetstore.go
deleted file mode 100644
index f13c99b5fa..0000000000
--- a/storage/lnetstore.go
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright 2016 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package storage
-
-import (
- "context"
-
- "github.com/ethersphere/swarm/chunk"
- "github.com/ethersphere/swarm/network/timeouts"
-)
-
-// LNetStore is a wrapper of NetStore, which implements the chunk.Store interface. It is used only by the FileStore,
-// the component used by the Swarm API to store and retrieve content and to split and join chunks.
-type LNetStore struct {
- *NetStore
-}
-
-// NewLNetStore is a constructor for LNetStore
-func NewLNetStore(store *NetStore) *LNetStore {
- return &LNetStore{
- NetStore: store,
- }
-}
-
-// Get converts a chunk reference to a chunk Request (with empty Origin), handled by the NetStore, and
-// returns the requested chunk, or error.
-func (n *LNetStore) Get(ctx context.Context, mode chunk.ModeGet, ref Address) (ch Chunk, err error) {
- ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
- defer cancel()
-
- return n.NetStore.Get(ctx, mode, NewRequest(ref))
-}
diff --git a/storage/netstore.go b/storage/netstore.go
index 8f92d7aefd..ac4a33d4b9 100644
--- a/storage/netstore.go
+++ b/storage/netstore.go
@@ -18,291 +18,318 @@ package storage
import (
"context"
- "errors"
+ "encoding/hex"
"fmt"
"sync"
+ "sync/atomic"
"time"
+ "github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
- "github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/spancontext"
- lru "github.com/hashicorp/golang-lru"
-
- "github.com/ethereum/go-ethereum/metrics"
- "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
"github.com/syndtr/goleveldb/leveldb"
- "golang.org/x/sync/singleflight"
+
+ lru "github.com/hashicorp/golang-lru"
)
-const (
- // capacity for the fetchers LRU cache
- fetchersCapacity = 500000
+type (
+ NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher
)
-var (
- ErrNoSuitablePeer = errors.New("no suitable peer")
-)
-
-// Fetcher is a struct which maintains state of remote requests.
-// Fetchers are stored in fetchers map and signal to all interested parties if a given chunk is delivered
-// the mutex controls who closes the channel, and make sure we close the channel only once
-type Fetcher struct {
- Delivered chan struct{} // when closed, it means that the chunk this Fetcher refers to is delivered
-
- // it is possible for multiple actors to be delivering the same chunk,
- // for example through syncing and through retrieve request. however we want the `Delivered` channel to be closed only
- // once, even if we put the same chunk multiple times in the NetStore.
- once sync.Once
-
- CreatedAt time.Time // timestamp when the fetcher was created, used for metrics measuring lifetime of fetchers
- CreatedBy string // who created the fetcher - "request" or "syncing", used for metrics measuring lifecycle of fetchers
-
- RequestedBySyncer bool // whether we have issued at least once a request through Offered/Wanted hashes flow
+type NetFetcher interface {
+ Request(hopCount uint8)
+ Offer(source *enode.ID)
}
-// NewFetcher is a constructor for a Fetcher
-func NewFetcher() *Fetcher {
- return &Fetcher{make(chan struct{}), sync.Once{}, time.Now(), "", false}
-}
-
-// SafeClose signals to interested parties (those waiting for a signal on fi.Delivered) that a chunk is delivered.
-// It closes the fi.Delivered channel through the sync.Once object, because it is possible for a chunk to be
-// delivered multiple times concurrently.
-func (fi *Fetcher) SafeClose() {
- fi.once.Do(func() {
- close(fi.Delivered)
- })
-}
-
-type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, error)
-
-// NetStore is an extension of LocalStore
+// NetStore is an extension of local storage
// it implements the ChunkStore interface
-// on request it initiates remote cloud retrieval
+// on request it initiates remote cloud retrieval using a fetcher
+// fetchers are unique to a chunk and are stored in fetchers LRU memory cache
+// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address
type NetStore struct {
chunk.Store
- localID enode.ID // our local enode - used when issuing RetrieveRequests
- fetchers *lru.Cache
- putMu sync.Mutex
- requestGroup singleflight.Group
- RemoteGet RemoteGetFunc
+ mu sync.Mutex
+ fetchers *lru.Cache
+ NewNetFetcherFunc NewNetFetcherFunc
+ closeC chan struct{}
}
-// NewNetStore creates a new NetStore using the provided chunk.Store and localID of the node.
-func NewNetStore(store chunk.Store, localID enode.ID) *NetStore {
- fetchers, _ := lru.New(fetchersCapacity)
+var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if requests are coming in
- return &NetStore{
- fetchers: fetchers,
- Store: store,
- localID: localID,
+// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a
+// constructor function that can create a fetch function for a specific chunk address.
+func NewNetStore(store chunk.Store, nnf NewNetFetcherFunc) (*NetStore, error) {
+ fetchers, err := lru.New(defaultChunkRequestsCacheCapacity)
+ if err != nil {
+ return nil, err
}
+ return &NetStore{
+ Store: store,
+ fetchers: fetchers,
+ NewNetFetcherFunc: nnf,
+ closeC: make(chan struct{}),
+ }, nil
}
// Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in
// the fetchers cache
func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, error) {
- n.putMu.Lock()
- defer n.putMu.Unlock()
+ n.mu.Lock()
+ defer n.mu.Unlock()
- log.Trace("netstore.put", "ref", ch.Address().String(), "mode", mode)
-
- // put the chunk to the localstore, there should be no error
+ // put to the chunk to the store, there should be no error
exists, err := n.Store.Put(ctx, mode, ch)
if err != nil {
return exists, err
}
- // notify RemoteGet (or SwarmSyncerClient) about a chunk delivery and it being stored
- fi, ok := n.fetchers.Get(ch.Address().String())
- if ok {
- // we need SafeClose, because it is possible for a chunk to both be
- // delivered through syncing and through a retrieve request
- fii := fi.(*Fetcher)
- fii.SafeClose()
- log.Trace("netstore.put chunk delivered and stored", "ref", ch.Address().String())
-
- metrics.GetOrRegisterResettingTimer(fmt.Sprintf("netstore.fetcher.lifetime.%s", fii.CreatedBy), nil).UpdateSince(fii.CreatedAt)
-
- // helper snippet to log if a chunk took way to long to be delivered
- slowChunkDeliveryThreshold := 5 * time.Second
- if time.Since(fii.CreatedAt) > slowChunkDeliveryThreshold {
- log.Trace("netstore.put slow chunk delivery", "ref", ch.Address().String())
- }
-
- n.fetchers.Remove(ch.Address().String())
+ // if chunk is now put in the store, check if there was an active fetcher and call deliver on it
+ // (this delivers the chunk to requestors via the fetcher)
+ log.Trace("n.getFetcher", "ref", ch.Address())
+ if f := n.getFetcher(ch.Address()); f != nil {
+ log.Trace("n.getFetcher deliver", "ref", ch.Address())
+ f.deliver(ctx, ch)
}
-
return exists, nil
}
+// Get retrieves the chunk from the NetStore DPA synchronously.
+// It calls NetStore.get, and if the chunk is not in local Storage
+// it calls fetch with the request, which blocks until the chunk
+// arrived or context is done
+func (n *NetStore) Get(rctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, error) {
+ chunk, fetch, err := n.get(rctx, mode, ref)
+ if err != nil {
+ return nil, err
+ }
+ if chunk != nil {
+ // this is not measuring how long it takes to get the chunk for the localstore, but
+ // rather just adding a span for clarity when inspecting traces in Jaeger, in order
+ // to make it easier to reason which is the node that actually delivered a chunk.
+ _, sp := spancontext.StartSpan(
+ rctx,
+ "localstore.get")
+ defer sp.Finish()
+
+ return chunk, nil
+ }
+ return fetch(rctx)
+}
+
+// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function,
+// which returns after the chunk is available or the context is done
+func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error {
+ chunk, fetch, _ := n.get(ctx, chunk.ModeGetRequest, ref)
+ if chunk != nil {
+ return nil
+ }
+ return func(ctx context.Context) error {
+ _, err := fetch(ctx)
+ return err
+ }
+}
+
// Close chunk store
-func (n *NetStore) Close() error {
+func (n *NetStore) Close() (err error) {
+ close(n.closeC)
+
+ wg := sync.WaitGroup{}
+ for _, key := range n.fetchers.Keys() {
+ if f, ok := n.fetchers.Get(key); ok {
+ if fetch, ok := f.(*fetcher); ok {
+ wg.Add(1)
+ go func(fetch *fetcher) {
+ defer wg.Done()
+ fetch.cancel()
+
+ select {
+ case <-fetch.deliveredC:
+ case <-fetch.cancelledC:
+ }
+ }(fetch)
+ }
+ }
+ }
+ wg.Wait()
+
return n.Store.Close()
}
-// Get retrieves a chunk
-// If it is not found in the LocalStore then it uses RemoteGet to fetch from the network.
-func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (Chunk, error) {
- metrics.GetOrRegisterCounter("netstore.get", nil).Inc(1)
- start := time.Now()
+// get attempts at retrieving the chunk from LocalStore
+// If it is not found then using getOrCreateFetcher:
+// 1. Either there is already a fetcher to retrieve it
+// 2. A new fetcher is created and saved in the fetchers cache
+// From here on, all Get will hit on this fetcher until the chunk is delivered
+// or all fetcher contexts are done.
+// It returns a chunk, a fetcher function and an error
+// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk.
+func (n *NetStore) get(ctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, func(context.Context) (Chunk, error), error) {
+ n.mu.Lock()
+ defer n.mu.Unlock()
- ref := req.Addr
-
- log.Trace("netstore.get", "ref", ref.String())
-
- ch, err := n.Store.Get(ctx, mode, ref)
+ chunk, err := n.Store.Get(ctx, mode, ref)
if err != nil {
- // TODO: fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped.
+ // TODO: Fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped.
if err != ErrChunkNotFound && err != leveldb.ErrNotFound {
- log.Error("localstore get error", "err", err)
+ log.Debug("Received error from LocalStore other than ErrNotFound", "err", err)
}
+ // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one
+ // if it doesn't exist yet
+ f := n.getOrCreateFetcher(ctx, ref)
+ // If the caller needs the chunk, it has to use the returned fetch function to get it
+ return nil, f.Fetch, nil
+ }
- log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String())
+ return chunk, nil, nil
+}
- v, err, _ := n.requestGroup.Do(ref.String(), func() (interface{}, error) {
- // currently we issue a retrieve request if a fetcher
- // has already been created by a syncer for that particular chunk.
- // so it is possible to
- // have 2 in-flight requests for the same chunk - one by a
- // syncer (offered/wanted/deliver flow) and one from
- // here - retrieve request
- fi, _, ok := n.GetOrCreateFetcher(ctx, ref, "request")
- if ok {
- err := n.RemoteFetch(ctx, req, fi)
- if err != nil {
- return nil, err
- }
- }
+// getOrCreateFetcher attempts at retrieving an existing fetchers
+// if none exists, creates one and saves it in the fetchers cache
+// caller must hold the lock
+func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher {
+ if f := n.getFetcher(ref); f != nil {
+ return f
+ }
- ch, err := n.Store.Get(ctx, mode, ref)
- if err != nil {
- log.Error(err.Error(), "ref", ref)
- return nil, errors.New("item should have been in localstore, but it is not")
- }
+ // no fetcher for the given address, we have to create a new one
+ key := hex.EncodeToString(ref)
+ // create the context during which fetching is kept alive
+ cctx, cancel := context.WithTimeout(ctx, fetcherTimeout)
+ // destroy is called when all requests finish
+ destroy := func() {
+ // remove fetcher from fetchers
+ n.fetchers.Remove(key)
+ // stop fetcher by cancelling context called when
+ // all requests cancelled/timedout or chunk is delivered
+ cancel()
+ }
+ // peers always stores all the peers which have an active request for the chunk. It is shared
+ // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because
+ // the peers which requested the chunk should not be requested to deliver it.
+ peers := &sync.Map{}
- // fi could be nil (when ok == false) if the chunk was added to the NetStore between n.store.Get and the call to n.GetOrCreateFetcher
- if fi != nil {
- metrics.GetOrRegisterResettingTimer(fmt.Sprintf("fetcher.%s.request", fi.CreatedBy), nil).UpdateSince(start)
- }
+ cctx, sp := spancontext.StartSpan(
+ cctx,
+ "netstore.fetcher",
+ )
- return ch, nil
- })
+ sp.LogFields(olog.String("ref", ref.String()))
+ fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC)
+ n.fetchers.Add(key, fetcher)
- if err != nil {
- log.Trace(err.Error(), "ref", ref)
+ return fetcher
+}
+
+// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists,
+// otherwise it returns nil
+func (n *NetStore) getFetcher(ref Address) *fetcher {
+ key := hex.EncodeToString(ref)
+ f, ok := n.fetchers.Get(key)
+ if ok {
+ return f.(*fetcher)
+ }
+ return nil
+}
+
+// RequestsCacheLen returns the current number of outgoing requests stored in the cache
+func (n *NetStore) RequestsCacheLen() int {
+ return n.fetchers.Len()
+}
+
+// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the
+// peers who have requested it and did not receive it yet.
+type fetcher struct {
+ addr Address // address of chunk
+ chunk Chunk // fetcher can set the chunk on the fetcher
+ deliveredC chan struct{} // chan signalling chunk delivery to requests
+ cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore)
+ netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context
+ cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called
+ peers *sync.Map // the peers which asked for the chunk
+ requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called
+ deliverOnce *sync.Once // guarantees that we only close deliveredC once
+ span opentracing.Span // measure retrieve time per chunk
+}
+
+// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually
+// does the retrieval (in non-test cases this is coming from the network package). cancel function is
+// called either
+// 1. when the chunk has been fetched all peers have been either notified or their context has been done
+// 2. the chunk has not been fetched but all context from all the requests has been done
+// The peers map stores all the peers which have requested chunk.
+func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher {
+ cancelOnce := &sync.Once{} // cancel should only be called once
+ return &fetcher{
+ addr: addr,
+ deliveredC: make(chan struct{}),
+ deliverOnce: &sync.Once{},
+ cancelledC: closeC,
+ netFetcher: nf,
+ cancel: func() {
+ cancelOnce.Do(func() {
+ cancel()
+ })
+ },
+ peers: peers,
+ span: span,
+ }
+}
+
+// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available
+// locally.
+func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) {
+ atomic.AddInt32(&f.requestCnt, 1)
+ defer func() {
+ // if all the requests are done the fetcher can be cancelled
+ if atomic.AddInt32(&f.requestCnt, -1) == 0 {
+ f.cancel()
+ }
+ f.span.Finish()
+ }()
+
+ // The peer asking for the chunk. Store in the shared peers map, but delete after the request
+ // has been delivered
+ peer := rctx.Value("peer")
+ if peer != nil {
+ f.peers.Store(peer, time.Now())
+ defer f.peers.Delete(peer)
+ }
+
+ // If there is a source in the context then it is an offer, otherwise a request
+ sourceIF := rctx.Value("source")
+
+ hopCount, _ := rctx.Value("hopcount").(uint8)
+
+ if sourceIF != nil {
+ var source enode.ID
+ if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil {
return nil, err
}
-
- c := v.(Chunk)
-
- log.Trace("netstore.singleflight returned", "ref", ref.String(), "err", err)
-
- return c, nil
- }
-
- ctx, ssp := spancontext.StartSpan(
- ctx,
- "localstore.get")
- defer ssp.Finish()
-
- return ch, nil
-}
-
-// RemoteFetch is handling the retry mechanism when making a chunk request to our peers.
-// For a given chunk Request, we call RemoteGet, which selects the next eligible peer and
-// issues a RetrieveRequest and we wait for a delivery. If a delivery doesn't arrive within the SearchTimeout
-// we retry.
-func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) error {
- // while we haven't timed-out, and while we don't have a chunk,
- // iterate over peers and try to find a chunk
- metrics.GetOrRegisterCounter("remote.fetch", nil).Inc(1)
-
- ref := req.Addr
-
- for {
- metrics.GetOrRegisterCounter("remote.fetch.inner", nil).Inc(1)
-
- ctx, osp := spancontext.StartSpan(
- ctx,
- "remote.fetch")
- osp.LogFields(olog.String("ref", ref.String()))
-
- log.Trace("remote.fetch", "ref", ref)
-
- currentPeer, err := n.RemoteGet(ctx, req, n.localID)
- if err != nil {
- log.Trace(err.Error(), "ref", ref)
- osp.LogFields(olog.String("err", err.Error()))
- osp.Finish()
- return ErrNoSuitablePeer
- }
-
- // add peer to the set of peers to skip from now
- log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String())
- req.PeersToSkip.Store(currentPeer.String(), time.Now())
-
- select {
- case <-fi.Delivered:
- log.Trace("remote.fetch, chunk delivered", "ref", ref)
-
- osp.LogFields(olog.Bool("delivered", true))
- osp.Finish()
- return nil
- case <-time.After(timeouts.SearchTimeout):
- metrics.GetOrRegisterCounter("remote.fetch.timeout.search", nil).Inc(1)
-
- osp.LogFields(olog.Bool("timeout", true))
- osp.Finish()
- break
- case <-ctx.Done(): // global fetcher timeout
- log.Trace("remote.fetch, fail", "ref", ref)
- metrics.GetOrRegisterCounter("remote.fetch.timeout.global", nil).Inc(1)
-
- osp.LogFields(olog.Bool("fail", true))
- osp.Finish()
- return ctx.Err()
- }
- }
-}
-
-// Has is the storage layer entry point to query the underlying
-// database to return if it has a chunk or not.
-func (n *NetStore) Has(ctx context.Context, ref Address) (bool, error) {
- return n.Store.Has(ctx, ref)
-}
-
-// GetOrCreateFetcher returns the Fetcher for a given chunk, if this chunk is not in the LocalStore.
-// If the chunk is in the LocalStore, it returns nil for the Fetcher and ok == false
-func (n *NetStore) GetOrCreateFetcher(ctx context.Context, ref Address, interestedParty string) (f *Fetcher, loaded bool, ok bool) {
- n.putMu.Lock()
- defer n.putMu.Unlock()
-
- has, err := n.Store.Has(ctx, ref)
- if err != nil {
- log.Error(err.Error())
- }
- if has {
- return nil, false, false
- }
-
- f = NewFetcher()
- v, loaded := n.fetchers.Get(ref.String())
- log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded)
- if loaded {
- f = v.(*Fetcher)
+ f.netFetcher.Offer(&source)
} else {
- f.CreatedBy = interestedParty
- n.fetchers.Add(ref.String(), f)
+ f.netFetcher.Request(hopCount)
}
- // if fetcher created by request, but we get a call from syncer, make sure we issue a second request
- if f.CreatedBy != interestedParty && !f.RequestedBySyncer {
- f.RequestedBySyncer = true
- return f, false, true
+ // wait until either the chunk is delivered or the context is done
+ select {
+ case <-rctx.Done():
+ return nil, rctx.Err()
+ case <-f.deliveredC:
+ return f.chunk, nil
+ case <-f.cancelledC:
+ return nil, fmt.Errorf("fetcher cancelled")
}
-
- return f, loaded, true
+}
+
+// deliver is called by NetStore.Put to notify all pending requests
+func (f *fetcher) deliver(ctx context.Context, ch Chunk) {
+ f.deliverOnce.Do(func() {
+ f.chunk = ch
+ // closing the deliveredC channel will terminate ongoing requests
+ close(f.deliveredC)
+ log.Trace("n.getFetcher close deliveredC", "ref", ch.Address())
+ })
}
diff --git a/storage/netstore_test.go b/storage/netstore_test.go
new file mode 100644
index 0000000000..41f37eb398
--- /dev/null
+++ b/storage/netstore_test.go
@@ -0,0 +1,702 @@
+// Copyright 2018 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package storage
+
+import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/p2p/enode"
+ "github.com/ethersphere/swarm/chunk"
+ "github.com/ethersphere/swarm/storage/localstore"
+)
+
+var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9")
+
+type mockNetFetcher struct {
+ peers *sync.Map
+ sources []*enode.ID
+ peersPerRequest [][]Address
+ requestCalled bool
+ offerCalled bool
+ quit <-chan struct{}
+ ctx context.Context
+ hopCounts []uint8
+ mu sync.Mutex
+}
+
+func (m *mockNetFetcher) Offer(source *enode.ID) {
+ m.offerCalled = true
+ m.sources = append(m.sources, source)
+}
+
+func (m *mockNetFetcher) Request(hopCount uint8) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ m.requestCalled = true
+ var peers []Address
+ m.peers.Range(func(key interface{}, _ interface{}) bool {
+ peers = append(peers, common.FromHex(key.(string)))
+ return true
+ })
+ m.peersPerRequest = append(m.peersPerRequest, peers)
+ m.hopCounts = append(m.hopCounts, hopCount)
+}
+
+type mockNetFetchFuncFactory struct {
+ fetcher *mockNetFetcher
+}
+
+func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher {
+ m.fetcher.peers = peers
+ m.fetcher.quit = ctx.Done()
+ m.fetcher.ctx = ctx
+ return m.fetcher
+}
+
+func newTestNetStore(t *testing.T) (netStore *NetStore, fetcher *mockNetFetcher, cleanup func()) {
+ t.Helper()
+
+ dir, err := ioutil.TempDir("", "swarm-storage-")
+ if err != nil {
+ t.Fatal(err)
+ }
+ localStore, err := localstore.New(dir, make([]byte, 32), nil)
+ if err != nil {
+ os.RemoveAll(dir)
+ t.Fatal(err)
+ }
+ cleanup = func() {
+ localStore.Close()
+ os.RemoveAll(dir)
+ }
+
+ fetcher = new(mockNetFetcher)
+ mockNetFetchFuncFactory := &mockNetFetchFuncFactory{
+ fetcher: fetcher,
+ }
+ netStore, err = NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher)
+ if err != nil {
+ cleanup()
+ t.Fatal(err)
+ }
+ return netStore, fetcher, cleanup
+}
+
+// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put.
+// After the Put there should no active fetchers, and the context created for the fetcher should
+// be cancelled.
+func TestNetStoreGetAndPut(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get
+ putErrC := make(chan error)
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
+ putErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
+ return
+ }
+
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
+ if err != nil {
+ putErrC <- fmt.Errorf("Expected no err got %v", err)
+ return
+ }
+
+ putErrC <- nil
+ }()
+
+ close(c)
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) // this is blocked until the Put above is done
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+
+ if err := <-putErrC; err != nil {
+ t.Fatal(err)
+ }
+ // the retrieved chunk should be the same as what we Put
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
+ t.Fatalf("Different chunk received than what was put")
+ }
+ // the chunk is already available locally, so there should be no active fetchers waiting for it
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
+
+ // A fetcher was created when the Get was called (and the chunk was not available). The chunk
+ // was delivered with the Put call, so the fetcher should be cancelled now.
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+
+}
+
+// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get.
+// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore,
+// there is no need to create fetchers.
+func TestNetStoreGetAfterPut(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ // First we Put the chunk, so the chunk will be available locally
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+
+ // Get should retrieve the chunk from LocalStore, without creating fetcher
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+ // the retrieved chunk should be the same as what we Put
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
+ t.Fatalf("Different chunk received than what was put")
+ }
+ // no fetcher offer or request should be created for a locally available chunk
+ if fetcher.offerCalled || fetcher.requestCalled {
+ t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called")
+ }
+ // no fetchers should be created for a locally available chunk
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to not have fetcher")
+ }
+
+}
+
+// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout
+func TestNetStoreGetTimeout(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get
+ fetcherErrC := make(chan error)
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
+ fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
+ return
+ }
+
+ fetcherErrC <- nil
+ }()
+
+ close(c)
+ // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will
+ // be a timeout
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+
+ // Check if the timeout happened
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadLineExceeded err got %v", err)
+ }
+
+ if err := <-fetcherErrC; err != nil {
+ t.Fatal(err)
+ }
+
+ // A fetcher was created, check if it has been removed after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after timeout")
+ }
+
+ // Check if the fetcher context has been cancelled after the timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks
+// the errors
+func TestNetStoreGetCancel(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+
+ c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get
+ fetcherErrC := make(chan error, 1)
+ go func() {
+ <-c // wait for the Get to be called
+ time.Sleep(200 * time.Millisecond) // and a little more so it is surely called
+ // check if netStore created a fetcher in the Get call for the unavailable chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
+ fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call")
+ return
+ }
+
+ fetcherErrC <- nil
+ cancel()
+ }()
+
+ close(c)
+
+ // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+
+ if err := <-fetcherErrC; err != nil {
+ t.Fatal(err)
+ }
+
+ // After the context is cancelled above Get should return with an error
+ if err != context.Canceled {
+ t.Fatalf("Expected context.Canceled err got %v", err)
+ }
+
+ // A fetcher was created, check if it has been removed after cancel
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after cancel")
+ }
+
+ // Check if the fetcher context has been cancelled after the request context cancel
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is
+// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher
+// for the chunk retrieval
+func TestNetStoreMultipleGetAndPut(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+
+ putErrC := make(chan error)
+ go func() {
+ // sleep to make sure Put is called after all the Get
+ time.Sleep(500 * time.Millisecond)
+ // check if netStore created exactly one fetcher for all Get calls
+ if netStore.fetchers.Len() != 1 {
+ putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls")
+ return
+ }
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
+ if err != nil {
+ putErrC <- fmt.Errorf("Expected no err got %v", err)
+ return
+ }
+ putErrC <- nil
+ }()
+
+ count := 4
+ // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above.
+ errC := make(chan error)
+ for i := 0; i < count; i++ {
+ go func() {
+ recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+ if err != nil {
+ errC <- fmt.Errorf("Expected no err got %v", err)
+ }
+ if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) {
+ errC <- errors.New("Different chunk received than what was put")
+ }
+ errC <- nil
+ }()
+ }
+
+ if err := <-putErrC; err != nil {
+ t.Fatal(err)
+ }
+
+ timeout := time.After(1 * time.Second)
+
+ // The Get calls should return after Put, so no timeout expected
+ for i := 0; i < count; i++ {
+ select {
+ case err := <-errC:
+ if err != nil {
+ t.Fatal(err)
+ }
+ case <-timeout:
+ t.Fatalf("Timeout waiting for Get calls to return")
+ }
+ }
+
+ // A fetcher was created, check if it has been removed after cancel
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
+
+ // A fetcher was created, check if it has been removed after delivery
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+
+}
+
+// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout
+func TestNetStoreFetchFuncTimeout(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ chunk := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
+ defer cancel()
+
+ // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
+ }
+
+ // There should an active fetcher for the chunk after the FetchFunc call
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // wait function should timeout because we don't deliver the chunk with a Put
+ err := wait(ctx)
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadLineExceeded err got %v", err)
+ }
+
+ // the fetcher should be removed after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after timeout")
+ }
+
+ // the fetcher context should be cancelled after timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk
+func TestNetStoreFetchFuncAfterPut(t *testing.T) {
+ netStore, _, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+
+ // We deliver the created the chunk with a Put
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+
+ // FetchFunc should return nil, because the chunk is available locally, no need to fetch it
+ wait := netStore.FetchFunc(ctx, ch.Address())
+ if wait != nil {
+ t.Fatal("Expected wait to be nil")
+ }
+
+ // No fetchers should be created at all
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to not have fetcher")
+ }
+}
+
+// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk
+func TestNetStoreGetCallsRequest(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx := context.WithValue(context.Background(), "hopcount", uint8(5))
+ ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
+ defer cancel()
+
+ // We call get for a not available chunk, it will timeout because the chunk is not delivered
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected context.DeadlineExceeded err got %v", err)
+ }
+
+ // NetStore should call NetFetcher.Request and wait for the chunk
+ if !fetcher.requestCalled {
+ t.Fatal("Expected NetFetcher.Request to be called")
+ }
+
+ if fetcher.hopCounts[0] != 5 {
+ t.Fatalf("Expected NetFetcher.Request be called with hopCount 5, got %v", fetcher.hopCounts[0])
+ }
+}
+
+// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk
+// in case of a source peer provided in the context.
+func TestNetStoreGetCallsOffer(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ // If a source peer is added to the context, NetStore will handle it as an offer
+ ctx := context.WithValue(context.Background(), "source", sourcePeerID.String())
+ ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
+ defer cancel()
+
+ // We call get for a not available chunk, it will timeout because the chunk is not delivered
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address())
+
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err)
+ }
+
+ // NetStore should call NetFetcher.Offer with the source peer
+ if !fetcher.offerCalled {
+ t.Fatal("Expected NetFetcher.Request to be called")
+ }
+
+ if len(fetcher.sources) != 1 {
+ t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources))
+ }
+
+ if fetcher.sources[0].String() != sourcePeerID.String() {
+ t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0])
+ }
+
+}
+
+// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context.
+// There is no Put call, so the Get calls timeout
+func TestNetStoreFetcherCountPeers(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ addr := randomAddr()
+ peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()}
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ errC := make(chan error)
+ nrGets := 3
+
+ // Call Get 3 times with a peer in context
+ for i := 0; i < nrGets; i++ {
+ peer := peers[i]
+ go func() {
+ ctx := context.WithValue(ctx, "peer", peer)
+ _, err := netStore.Get(ctx, chunk.ModeGetRequest, addr)
+ errC <- err
+ }()
+ }
+
+ // All 3 Get calls should timeout
+ for i := 0; i < nrGets; i++ {
+ err := <-errC
+ if err != context.DeadlineExceeded {
+ t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err)
+ }
+ }
+
+ // fetcher should be closed after timeout
+ select {
+ case <-fetcher.quit:
+ case <-time.After(3 * time.Second):
+ t.Fatalf("mockNetFetcher not closed after timeout")
+ }
+
+ // All 3 peers should be given to NetFetcher after the 3 Get calls
+ if len(fetcher.peersPerRequest) != nrGets {
+ t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest))
+ }
+
+ for i, peers := range fetcher.peersPerRequest {
+ if len(peers) < i+1 {
+ t.Fatalf("Expected at least %v got %v", i+1, len(peers))
+ }
+ }
+}
+
+// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times,
+// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks
+// if the fetcher is closed.
+func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ ch := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+
+ // FetchFunc should return a non-nil wait function, because the chunk is not available
+ wait := netStore.FetchFunc(ctx, ch.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
+ }
+
+ // There should be exactly one fetcher for the chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // Call wait three times in parallel
+ count := 3
+ errC := make(chan error)
+ for i := 0; i < count; i++ {
+ go func() {
+ errC <- wait(ctx)
+ }()
+ }
+
+ // sleep a little so the wait functions are called above
+ time.Sleep(100 * time.Millisecond)
+
+ // there should be still only one fetcher, because all wait calls are for the same chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil {
+ t.Fatal("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // Deliver the chunk with a Put
+ _, err := netStore.Put(ctx, chunk.ModePutRequest, ch)
+ if err != nil {
+ t.Fatalf("Expected no err got %v", err)
+ }
+
+ // wait until all wait calls return (because the chunk is delivered)
+ for i := 0; i < count; i++ {
+ err := <-errC
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // There should be no more fetchers for the delivered chunk
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
+
+ // The context for the fetcher should be cancelled after delivery
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes,
+// the only difference is that we don't deilver the chunk, just wait for timeout
+func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) {
+ netStore, fetcher, cleanup := newTestNetStore(t)
+ defer cleanup()
+
+ chunk := GenerateRandomChunk(chunk.DefaultSize)
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+
+ // FetchFunc should return a non-nil wait function, because the chunk is not available
+ wait := netStore.FetchFunc(ctx, chunk.Address())
+ if wait == nil {
+ t.Fatal("Expected wait function to be not nil")
+ }
+
+ // There should be exactly one fetcher for the chunk
+ if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil {
+ t.Fatalf("Expected netStore to have one fetcher for the requested chunk")
+ }
+
+ // Call wait three times in parallel
+ count := 3
+ errC := make(chan error)
+ for i := 0; i < count; i++ {
+ go func() {
+ rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer rcancel()
+ err := wait(rctx)
+ if err != context.DeadlineExceeded {
+ errC <- fmt.Errorf("Expected err %v got %v", context.DeadlineExceeded, err)
+ return
+ }
+ errC <- nil
+ }()
+ }
+
+ // wait until all wait calls timeout
+ for i := 0; i < count; i++ {
+ err := <-errC
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ // There should be no more fetchers after timeout
+ if netStore.fetchers.Len() != 0 {
+ t.Fatal("Expected netStore to remove the fetcher after delivery")
+ }
+
+ // The context for the fetcher should be cancelled after timeout
+ select {
+ case <-fetcher.ctx.Done():
+ default:
+ t.Fatal("Expected fetcher context to be cancelled")
+ }
+}
+
+func randomAddr() Address {
+ addr := make([]byte, 32)
+ rand.Read(addr)
+ return Address(addr)
+}
diff --git a/storage/request.go b/storage/request.go
deleted file mode 100644
index 47bb66cb19..0000000000
--- a/storage/request.go
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2018 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package storage
-
-import (
- "sync"
- "time"
-
- "github.com/ethersphere/swarm/network/timeouts"
-
- "github.com/ethereum/go-ethereum/p2p/enode"
-)
-
-// Request encapsulates all the necessary arguments when making a request to NetStore.
-// These could have also been added as part of the interface of NetStore.Get, but a request struct seemed
-// like a better option
-type Request struct {
- Addr Address // chunk address
- Origin enode.ID // who is sending us that request? we compare Origin to the suggested peer from RequestFromPeers
- PeersToSkip sync.Map // peers not to request chunk from
-}
-
-// NewRequest returns a new instance of Request based on chunk address skip check and
-// a map of peers to skip.
-func NewRequest(addr Address) *Request {
- return &Request{
- Addr: addr,
- }
-}
-
-// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk.
-// Peers to skip are kept per Request and for a time period of FailedPeerSkipDelay.
-func (r *Request) SkipPeer(nodeID string) bool {
- val, ok := r.PeersToSkip.Load(nodeID)
- if !ok {
- return false
- }
- t, ok := val.(time.Time)
- if ok && time.Now().After(t.Add(timeouts.FailedPeerSkipDelay)) {
- r.PeersToSkip.Delete(nodeID)
- return false
- }
- return true
-}
diff --git a/swarm.go b/swarm.go
index d6f40beed3..1bdaa63bfe 100644
--- a/swarm.go
+++ b/swarm.go
@@ -64,6 +64,7 @@ var (
startCounter = metrics.NewRegisteredCounter("stack,start", nil)
stopCounter = metrics.NewRegisteredCounter("stack,stop", nil)
uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil)
+ requestsCacheGauge = metrics.NewRegisteredGauge("storage.cache.requests.size", nil)
)
// the swarm stack
@@ -173,15 +174,17 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
feedsHandler,
)
- nodeID := config.Enode.ID()
- self.netStore = storage.NewNetStore(lstore, nodeID)
+ self.netStore, err = storage.NewNetStore(lstore, nil)
+ if err != nil {
+ return nil, err
+ }
to := network.NewKademlia(
common.FromHex(config.BzzKey),
network.NewKadParams(),
)
delivery := stream.NewDelivery(to, self.netStore)
- self.netStore.RemoteGet = delivery.RequestFromPeers
+ self.netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, config.DeliverySkipCheck).New
feedsHandler.SetStore(self.netStore)
@@ -194,6 +197,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
self.accountingMetrics = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db"))
}
+ nodeID := config.Enode.ID()
+
syncing := stream.SyncingAutoSubscribe
if !config.SyncEnabled || config.LightNodeEnabled {
syncing = stream.SyncingDisabled
@@ -209,8 +214,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
tags := chunk.NewTags() //todo load from state store
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
- lnetStore := storage.NewLNetStore(self.netStore)
- self.fileStore = storage.NewFileStore(lnetStore, self.config.FileStoreParams, tags)
+ self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams, tags)
log.Debug("Setup local storage")
@@ -407,6 +411,7 @@ func (s *Swarm) Start(srv *p2p.Server) error {
select {
case <-time.After(updateGaugesPeriod):
uptimeGauge.Update(time.Since(startTime).Nanoseconds())
+ requestsCacheGauge.Update(int64(s.netStore.RequestsCacheLen()))
case <-doneC:
return
}
diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go
deleted file mode 100644
index 97a1aa4bb3..0000000000
--- a/vendor/golang.org/x/sync/singleflight/singleflight.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Copyright 2013 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Package singleflight provides a duplicate function call suppression
-// mechanism.
-package singleflight // import "golang.org/x/sync/singleflight"
-
-import "sync"
-
-// call is an in-flight or completed singleflight.Do call
-type call struct {
- wg sync.WaitGroup
-
- // These fields are written once before the WaitGroup is done
- // and are only read after the WaitGroup is done.
- val interface{}
- err error
-
- // forgotten indicates whether Forget was called with this call's key
- // while the call was still in flight.
- forgotten bool
-
- // These fields are read and written with the singleflight
- // mutex held before the WaitGroup is done, and are read but
- // not written after the WaitGroup is done.
- dups int
- chans []chan<- Result
-}
-
-// Group represents a class of work and forms a namespace in
-// which units of work can be executed with duplicate suppression.
-type Group struct {
- mu sync.Mutex // protects m
- m map[string]*call // lazily initialized
-}
-
-// Result holds the results of Do, so they can be passed
-// on a channel.
-type Result struct {
- Val interface{}
- Err error
- Shared bool
-}
-
-// Do executes and returns the results of the given function, making
-// sure that only one execution is in-flight for a given key at a
-// time. If a duplicate comes in, the duplicate caller waits for the
-// original to complete and receives the same results.
-// The return value shared indicates whether v was given to multiple callers.
-func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
- g.mu.Lock()
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- if c, ok := g.m[key]; ok {
- c.dups++
- g.mu.Unlock()
- c.wg.Wait()
- return c.val, c.err, true
- }
- c := new(call)
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
-
- g.doCall(c, key, fn)
- return c.val, c.err, c.dups > 0
-}
-
-// DoChan is like Do but returns a channel that will receive the
-// results when they are ready.
-func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
- ch := make(chan Result, 1)
- g.mu.Lock()
- if g.m == nil {
- g.m = make(map[string]*call)
- }
- if c, ok := g.m[key]; ok {
- c.dups++
- c.chans = append(c.chans, ch)
- g.mu.Unlock()
- return ch
- }
- c := &call{chans: []chan<- Result{ch}}
- c.wg.Add(1)
- g.m[key] = c
- g.mu.Unlock()
-
- go g.doCall(c, key, fn)
-
- return ch
-}
-
-// doCall handles the single call for a key.
-func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
- c.val, c.err = fn()
- c.wg.Done()
-
- g.mu.Lock()
- if !c.forgotten {
- delete(g.m, key)
- }
- for _, ch := range c.chans {
- ch <- Result{c.val, c.err, c.dups > 0}
- }
- g.mu.Unlock()
-}
-
-// Forget tells the singleflight to forget about a key. Future calls
-// to Do for this key will call the function rather than waiting for
-// an earlier call to complete.
-func (g *Group) Forget(key string) {
- g.mu.Lock()
- if c, ok := g.m[key]; ok {
- c.forgotten = true
- }
- delete(g.m, key)
- g.mu.Unlock()
-}
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 2dcfbba3fd..06dd683cdd 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -1240,12 +1240,6 @@
"revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146",
"revisionTime": "2019-04-03T01:06:53Z"
},
- {
- "checksumSHA1": "FuQoDr6zh5GsiVVyo3oDZcUVC3c=",
- "path": "golang.org/x/sync/singleflight",
- "revision": "112230192c580c3556b8cee6403af37a4fc5f28c",
- "revisionTime": "2019-04-22T22:11:18Z"
- },
{
"checksumSHA1": "4TEYFKrAUuwBMqExjQBsnf/CgjQ=",
"path": "golang.org/x/sync/syncmap",