From 604960938bd8056f5f3bb7261b2546748eb73c68 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Mon, 17 Jun 2019 10:30:55 +0200 Subject: [PATCH] Revert "simplification of Fetchers (#1344)" (#1491) This reverts commit 0b724bd4d5448d77dde26684f06529853b4d8fe0. --- network/fetcher.go | 336 +++++++++ network/fetcher_test.go | 476 ++++++++++++ network/stream/common_test.go | 30 +- network/stream/delivery.go | 250 ++----- network/stream/delivery_test.go | 320 +++++++- network/stream/intervals_test.go | 35 +- network/stream/messages.go | 7 +- network/stream/snapshot_sync_test.go | 4 +- network/stream/stream.go | 2 +- network/stream/streamer_test.go | 8 +- network/stream/syncer.go | 24 +- network/timeouts/timeouts.go | 24 - storage/feed/handler.go | 7 +- storage/feed/lookup/lookup.go | 2 - storage/feed/testutil.go | 20 +- storage/filestore.go | 5 +- storage/lnetstore.go | 46 -- storage/netstore.go | 487 ++++++------ storage/netstore_test.go | 702 ++++++++++++++++++ storage/request.go | 58 -- swarm.go | 15 +- .../x/sync/singleflight/singleflight.go | 120 --- vendor/vendor.json | 6 - 23 files changed, 2242 insertions(+), 742 deletions(-) create mode 100644 network/fetcher.go create mode 100644 network/fetcher_test.go delete mode 100644 network/timeouts/timeouts.go delete mode 100644 storage/lnetstore.go create mode 100644 storage/netstore_test.go delete mode 100644 storage/request.go delete mode 100644 vendor/golang.org/x/sync/singleflight/singleflight.go diff --git a/network/fetcher.go b/network/fetcher.go new file mode 100644 index 0000000000..0908c31efd --- /dev/null +++ b/network/fetcher.go @@ -0,0 +1,336 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package network + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/tracing" + olog "github.com/opentracing/opentracing-go/log" +) + +const ( + defaultSearchTimeout = 1 * time.Second + // maximum number of forwarded requests (hops), to make sure requests are not + // forwarded forever in peer loops + maxHopCount uint8 = 20 +) + +// Time to consider peer to be skipped. +// Also used in stream delivery. +var RequestTimeout = 10 * time.Second + +type RequestFunc func(context.Context, *Request) (*enode.ID, chan struct{}, error) + +// Fetcher is created when a chunk is not found locally. It starts a request handler loop once and +// keeps it alive until all active requests are completed. This can happen: +// 1. either because the chunk is delivered +// 2. or because the requester cancelled/timed out +// Fetcher self destroys itself after it is completed. +// TODO: cancel all forward requests after termination +type Fetcher struct { + protoRequestFunc RequestFunc // request function fetcher calls to issue retrieve request for a chunk + addr storage.Address // the address of the chunk to be fetched + offerC chan *enode.ID // channel of sources (peer node id strings) + requestC chan uint8 // channel for incoming requests (with the hopCount value in it) + searchTimeout time.Duration + skipCheck bool + ctx context.Context +} + +type Request struct { + Addr storage.Address // chunk address + Source *enode.ID // nodeID of peer to request from (can be nil) + SkipCheck bool // whether to offer the chunk first or deliver directly + peersToSkip *sync.Map // peers not to request chunk from (only makes sense if source is nil) + HopCount uint8 // number of forwarded requests (hops) +} + +// NewRequest returns a new instance of Request based on chunk address skip check and +// a map of peers to skip. +func NewRequest(addr storage.Address, skipCheck bool, peersToSkip *sync.Map) *Request { + return &Request{ + Addr: addr, + SkipCheck: skipCheck, + peersToSkip: peersToSkip, + } +} + +// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. +// Peers to skip are kept per Request and for a time period of RequestTimeout. +// This function is used in stream package in Delivery.RequestFromPeers to optimize +// requests for chunks. +func (r *Request) SkipPeer(nodeID string) bool { + val, ok := r.peersToSkip.Load(nodeID) + if !ok { + return false + } + t, ok := val.(time.Time) + if ok && time.Now().After(t.Add(RequestTimeout)) { + // deadline expired + r.peersToSkip.Delete(nodeID) + return false + } + return true +} + +// FetcherFactory is initialised with a request function and can create fetchers +type FetcherFactory struct { + request RequestFunc + skipCheck bool +} + +// NewFetcherFactory takes a request function and skip check parameter and creates a FetcherFactory +func NewFetcherFactory(request RequestFunc, skipCheck bool) *FetcherFactory { + return &FetcherFactory{ + request: request, + skipCheck: skipCheck, + } +} + +// New constructs a new Fetcher, for the given chunk. All peers in peersToSkip +// are not requested to deliver the given chunk. peersToSkip should always +// contain the peers which are actively requesting this chunk, to make sure we +// don't request back the chunks from them. +// The created Fetcher is started and returned. +func (f *FetcherFactory) New(ctx context.Context, source storage.Address, peers *sync.Map) storage.NetFetcher { + fetcher := NewFetcher(ctx, source, f.request, f.skipCheck) + go fetcher.run(peers) + return fetcher +} + +// NewFetcher creates a new Fetcher for the given chunk address using the given request function. +func NewFetcher(ctx context.Context, addr storage.Address, rf RequestFunc, skipCheck bool) *Fetcher { + return &Fetcher{ + addr: addr, + protoRequestFunc: rf, + offerC: make(chan *enode.ID), + requestC: make(chan uint8), + searchTimeout: defaultSearchTimeout, + skipCheck: skipCheck, + ctx: ctx, + } +} + +// Offer is called when an upstream peer offers the chunk via syncing as part of `OfferedHashesMsg` and the node does not have the chunk locally. +func (f *Fetcher) Offer(source *enode.ID) { + // First we need to have this select to make sure that we return if context is done + select { + case <-f.ctx.Done(): + return + default: + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.offerC <- source: + case <-f.ctx.Done(): + } +} + +// Request is called when an upstream peer request the chunk as part of `RetrieveRequestMsg`, or from a local request through FileStore, and the node does not have the chunk locally. +func (f *Fetcher) Request(hopCount uint8) { + // First we need to have this select to make sure that we return if context is done + select { + case <-f.ctx.Done(): + return + default: + } + + if hopCount >= maxHopCount { + log.Debug("fetcher request hop count limit reached", "hops", hopCount) + return + } + + // This select alone would not guarantee that we return of context is done, it could potentially + // push to offerC instead if offerC is available (see number 2 in https://golang.org/ref/spec#Select_statements) + select { + case f.requestC <- hopCount + 1: + case <-f.ctx.Done(): + } +} + +// start prepares the Fetcher +// it keeps the Fetcher alive within the lifecycle of the passed context +func (f *Fetcher) run(peers *sync.Map) { + var ( + doRequest bool // determines if retrieval is initiated in the current iteration + wait *time.Timer // timer for search timeout + waitC <-chan time.Time // timer channel + sources []*enode.ID // known sources, ie. peers that offered the chunk + requested bool // true if the chunk was actually requested + hopCount uint8 + ) + gone := make(chan *enode.ID) // channel to signal that a peer we requested from disconnected + + // loop that keeps the fetching process alive + // after every request a timer is set. If this goes off we request again from another peer + // note that the previous request is still alive and has the chance to deliver, so + // requesting again extends the search. ie., + // if a peer we requested from is gone we issue a new request, so the number of active + // requests never decreases + for { + select { + + // incoming offer + case source := <-f.offerC: + log.Trace("new source", "peer addr", source, "request addr", f.addr) + // 1) the chunk is offered by a syncing peer + // add to known sources + sources = append(sources, source) + // launch a request to the source iff the chunk was requested (not just expected because its offered by a syncing peer) + doRequest = requested + + // incoming request + case hopCount = <-f.requestC: + // 2) chunk is requested, set requested flag + // launch a request iff none been launched yet + doRequest = !requested + log.Trace("new request", "request addr", f.addr, "doRequest", doRequest) + requested = true + + // peer we requested from is gone. fall back to another + // and remove the peer from the peers map + case id := <-gone: + peers.Delete(id.String()) + doRequest = requested + log.Trace("peer gone", "peer id", id.String(), "request addr", f.addr, "doRequest", doRequest) + + // search timeout: too much time passed since the last request, + // extend the search to a new peer if we can find one + case <-waitC: + doRequest = requested + log.Trace("search timed out: requesting", "request addr", f.addr, "doRequest", doRequest) + + // all Fetcher context closed, can quit + case <-f.ctx.Done(): + log.Trace("terminate fetcher", "request addr", f.addr) + // TODO: send cancellations to all peers left over in peers map (i.e., those we requested from) + return + } + + // need to issue a new request + if doRequest { + var err error + sources, err = f.doRequest(gone, peers, sources, hopCount) + if err != nil { + log.Info("unable to request", "request addr", f.addr, "err", err) + } + } + + // if wait channel is not set, set it to a timer + if requested { + if wait == nil { + wait = time.NewTimer(f.searchTimeout) + defer wait.Stop() + waitC = wait.C + } else { + // stop the timer and drain the channel if it was not drained earlier + if !wait.Stop() { + select { + case <-wait.C: + default: + } + } + // reset the timer to go off after defaultSearchTimeout + wait.Reset(f.searchTimeout) + } + } + doRequest = false + } +} + +// doRequest attempts at finding a peer to request the chunk from +// * first it tries to request explicitly from peers that are known to have offered the chunk +// * if there are no such peers (available) it tries to request it from a peer closest to the chunk address +// excluding those in the peersToSkip map +// * if no such peer is found an error is returned +// +// if a request is successful, +// * the peer's address is added to the set of peers to skip +// * the peer's address is removed from prospective sources, and +// * a go routine is started that reports on the gone channel if the peer is disconnected (or terminated their streamer) +func (f *Fetcher) doRequest(gone chan *enode.ID, peersToSkip *sync.Map, sources []*enode.ID, hopCount uint8) ([]*enode.ID, error) { + var i int + var sourceID *enode.ID + var quit chan struct{} + + req := &Request{ + Addr: f.addr, + SkipCheck: f.skipCheck, + peersToSkip: peersToSkip, + HopCount: hopCount, + } + + foundSource := false + // iterate over known sources + for i = 0; i < len(sources); i++ { + req.Source = sources[i] + var err error + log.Trace("fetcher.doRequest", "request addr", f.addr, "peer", req.Source.String()) + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) + if err == nil { + // remove the peer from known sources + // Note: we can modify the source although we are looping on it, because we break from the loop immediately + sources = append(sources[:i], sources[i+1:]...) + foundSource = true + break + } + } + + // if there are no known sources, or none available, we try request from a closest node + if !foundSource { + req.Source = nil + var err error + sourceID, quit, err = f.protoRequestFunc(f.ctx, req) + if err != nil { + // if no peers found to request from + return sources, err + } + } + // add peer to the set of peers to skip from now + peersToSkip.Store(sourceID.String(), time.Now()) + + // if the quit channel is closed, it indicates that the source peer we requested from + // disconnected or terminated its streamer + // here start a go routine that watches this channel and reports the source peer on the gone channel + // this go routine quits if the fetcher global context is done to prevent process leak + go func() { + select { + case <-quit: + gone <- sourceID + case <-f.ctx.Done(): + } + + // finish the request span + spanId := fmt.Sprintf("stream.send.request.%v.%v", *sourceID, req.Addr) + span := tracing.ShiftSpanByKey(spanId) + + if span != nil { + span.LogFields(olog.String("finish", "from doRequest")) + span.Finish() + } + }() + return sources, nil +} diff --git a/network/fetcher_test.go b/network/fetcher_test.go new file mode 100644 index 0000000000..4e464f10f3 --- /dev/null +++ b/network/fetcher_test.go @@ -0,0 +1,476 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package network + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/p2p/enode" +) + +var requestedPeerID = enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") +var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9") + +// mockRequester pushes every request to the requestC channel when its doRequest function is called +type mockRequester struct { + // requests []Request + requestC chan *Request // when a request is coming it is pushed to requestC + waitTimes []time.Duration // with waitTimes[i] you can define how much to wait on the ith request (optional) + count int //counts the number of requests + quitC chan struct{} +} + +func newMockRequester(waitTimes ...time.Duration) *mockRequester { + return &mockRequester{ + requestC: make(chan *Request), + waitTimes: waitTimes, + quitC: make(chan struct{}), + } +} + +func (m *mockRequester) doRequest(ctx context.Context, request *Request) (*enode.ID, chan struct{}, error) { + waitTime := time.Duration(0) + if m.count < len(m.waitTimes) { + waitTime = m.waitTimes[m.count] + m.count++ + } + time.Sleep(waitTime) + m.requestC <- request + + // if there is a Source in the request use that, if not use the global requestedPeerId + source := request.Source + if source == nil { + source = &requestedPeerID + } + return source, m.quitC, nil +} + +// TestFetcherSingleRequest creates a Fetcher using mockRequester, and run it with a sample set of peers to skip. +// mockRequester pushes a Request on a channel every time the request function is called. Using +// this channel we test if calling Fetcher.Request calls the request function, and whether it uses +// the correct peers to skip which we provided for the fetcher.run function. +func TestFetcherSingleRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peers := []string{"a", "b", "c", "d"} + peersToSkip := &sync.Map{} + for _, p := range peers { + peersToSkip.Store(p, time.Now()) + } + + go fetcher.run(peersToSkip) + + fetcher.Request(0) + + select { + case request := <-requester.requestC: + // request should contain all peers from peersToSkip provided to the fetcher + for _, p := range peers { + if _, ok := request.peersToSkip.Load(p); !ok { + t.Fatalf("request.peersToSkip misses peer") + } + } + + // source peer should be also added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(requestedPeerID.String()); !ok { + t.Fatalf("request.peersToSkip does not contain peer returned by the request function") + } + + // hopCount in the forwarded request should be incremented + if request.HopCount != 1 { + t.Fatalf("Expected request.HopCount 1 got %v", request.HopCount) + } + + // fetch should trigger a request, if it doesn't happen in time, test should fail + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } +} + +// TestCancelStopsFetcher tests that a cancelled fetcher does not initiate further requests even if its fetch function is called +func TestFetcherCancelStopsFetcher(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // we start the fetcher, and then we immediately cancel the context + go fetcher.run(peersToSkip) + cancel() + + // we call Request with an active context + fetcher.Request(0) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } +} + +// TestFetchCancelStopsRequest tests that calling a Request function with a cancelled context does not initiate a request +func TestFetcherCancelStopsRequest(t *testing.T) { + t.Skip("since context is now per fetcher, this test is likely redundant") + + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // we start the fetcher with an active context + go fetcher.run(peersToSkip) + + // we call Request with a cancelled context + fetcher.Request(0) + + // fetcher should not initiate request, we can only check by waiting a bit and making sure no request is happening + select { + case <-requester.requestC: + t.Fatalf("cancelled fetch function initiated request") + case <-time.After(200 * time.Millisecond): + } + + // if there is another Request with active context, there should be a request, because the fetcher itself is not cancelled + fetcher.Request(0) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("expected request") + } +} + +// TestOfferUsesSource tests Fetcher Offer behavior. +// In this case there should be 1 (and only one) request initiated from the source peer, and the +// source nodeid should appear in the peersToSkip map. +func TestFetcherOfferUsesSource(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // start the fetcher + go fetcher.run(peersToSkip) + + // call the Offer function with the source peer + fetcher.Offer(&sourcePeerID) + + // fetcher should not initiate request + select { + case <-requester.requestC: + t.Fatalf("fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } + + // call Request after the Offer + fetcher.Request(0) + + // there should be exactly 1 request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Expected source id %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + select { + case <-requester.requestC: + t.Fatalf("Fetcher number of requests expected 1 got 2") + case <-time.After(200 * time.Millisecond): + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +func TestFetcherOfferAfterRequestUsesSourceFromContext(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + // start the fetcher + go fetcher.run(peersToSkip) + + // call Request first + fetcher.Request(0) + + // there should be a request coming from fetcher + var request *Request + select { + case request = <-requester.requestC: + if request.Source != nil { + t.Fatalf("Incorrect source peer id, expected nil got %v", request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // after the Request call Offer + fetcher.Offer(&sourcePeerID) + + // there should be a request coming from fetcher + select { + case request = <-requester.requestC: + if *request.Source != sourcePeerID { + t.Fatalf("Incorrect source peer id, expected %v got %v", sourcePeerID, request.Source) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetcher did not initiate request") + } + + // source peer should be added to peersToSkip eventually + time.Sleep(100 * time.Millisecond) + if _, ok := request.peersToSkip.Load(sourcePeerID.String()); !ok { + t.Fatalf("SourcePeerId not added to peersToSkip") + } +} + +// TestFetcherRetryOnTimeout tests that fetch retries after searchTimeOut has passed +func TestFetcherRetryOnTimeout(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + // set searchTimeOut to low value so the test is quicker + fetcher.searchTimeout = 250 * time.Millisecond + + peersToSkip := &sync.Map{} + + // start the fetcher + go fetcher.run(peersToSkip) + + // call the fetch function with an active context + fetcher.Request(0) + + // after 100ms the first request should be initiated + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not initiate request") + } + + // after another 100ms no new request should be initiated, because search timeout is 250ms + time.Sleep(100 * time.Millisecond) + + select { + case <-requester.requestC: + t.Fatalf("unexpected request from fetcher") + default: + } + + // after another 300ms search timeout is over, there should be a new request + time.Sleep(300 * time.Millisecond) + + select { + case <-requester.requestC: + default: + t.Fatalf("fetch did not retry request") + } +} + +// TestFetcherFactory creates a FetcherFactory and checks if the factory really creates and starts +// a Fetcher when it return a fetch function. We test the fetching functionality just by checking if +// a request is initiated when the fetch function is called +func TestFetcherFactory(t *testing.T) { + requester := newMockRequester(100 * time.Millisecond) + addr := make([]byte, 32) + fetcherFactory := NewFetcherFactory(requester.doRequest, false) + + peersToSkip := &sync.Map{} + + fetcher := fetcherFactory.New(context.Background(), addr, peersToSkip) + + fetcher.Request(0) + + // check if the created fetchFunction really starts a fetcher and initiates a request + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("fetch timeout") + } + +} + +func TestFetcherRequestQuitRetriesRequest(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + // make sure the searchTimeout is long so it is sure the request is not + // retried because of timeout + fetcher.searchTimeout = 10 * time.Second + + peersToSkip := &sync.Map{} + + go fetcher.run(peersToSkip) + + fetcher.Request(0) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated") + } + + close(requester.quitC) + + select { + case <-requester.requestC: + case <-time.After(200 * time.Millisecond): + t.Fatalf("request is not initiated after failed request") + } +} + +// TestRequestSkipPeer checks if PeerSkip function will skip provided peer +// and not skip unknown one. +func TestRequestSkipPeer(t *testing.T) { + addr := make([]byte, 32) + peers := []enode.ID{ + enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8"), + enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9"), + } + + peersToSkip := new(sync.Map) + peersToSkip.Store(peers[0].String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peers[0].String()) { + t.Errorf("peer not skipped") + } + + if r.SkipPeer(peers[1].String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerExpired checks if a peer to skip is not skipped +// after RequestTimeout has passed. +func TestRequestSkipPeerExpired(t *testing.T) { + addr := make([]byte, 32) + peer := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), time.Now()) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if r.SkipPeer(peer.String()) { + t.Errorf("peer skipped") + } +} + +// TestRequestSkipPeerPermanent checks if a peer to skip is not skipped +// after RequestTimeout is not skipped if it is set for a permanent skipping +// by value to peersToSkip map is not time.Duration. +func TestRequestSkipPeerPermanent(t *testing.T) { + addr := make([]byte, 32) + peer := enode.HexID("3431c3939e1ee2a6345e976a8234f9870152d64879f30bc272a074f6859e75e8") + + // set RequestTimeout to a low value and reset it after the test + defer func(t time.Duration) { RequestTimeout = t }(RequestTimeout) + RequestTimeout = 250 * time.Millisecond + + peersToSkip := new(sync.Map) + peersToSkip.Store(peer.String(), true) + r := NewRequest(addr, false, peersToSkip) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } + + time.Sleep(500 * time.Millisecond) + + if !r.SkipPeer(peer.String()) { + t.Errorf("peer not skipped") + } +} + +func TestFetcherMaxHopCount(t *testing.T) { + requester := newMockRequester() + addr := make([]byte, 32) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fetcher := NewFetcher(ctx, addr, requester.doRequest, true) + + peersToSkip := &sync.Map{} + + go fetcher.run(peersToSkip) + + // if hopCount is already at max no request should be initiated + select { + case <-requester.requestC: + t.Fatalf("cancelled fetcher initiated request") + case <-time.After(200 * time.Millisecond): + } +} diff --git a/network/stream/common_test.go b/network/stream/common_test.go index 739ff548fb..6d7d7d68e5 100644 --- a/network/stream/common_test.go +++ b/network/stream/common_test.go @@ -56,6 +56,7 @@ var ( bucketKeyStore = simulation.BucketKey("store") bucketKeyFileStore = simulation.BucketKey("filestore") + bucketKeyNetStore = simulation.BucketKey("netstore") bucketKeyDelivery = simulation.BucketKey("delivery") bucketKeyRegistry = simulation.BucketKey("registry") @@ -80,7 +81,7 @@ func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*ne return nil, nil, nil, nil, err } - netStore.RemoteGet = delivery.RequestFromPeers + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New return addr, netStore, delivery, cleanup, nil } @@ -92,13 +93,13 @@ func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *syn return nil, nil, nil, err } - netStore.RemoteGet = delivery.RequestFromPeers + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New return netStore, delivery, cleanup, nil } // newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc -func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf storage.RemoteGetFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { +func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) { addr := network.NewAddr(ctx.Config.Node()) netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr) @@ -106,7 +107,7 @@ func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket return nil, nil, nil, nil, err } - netStore.RemoteGet = rf + netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New return addr, netStore, delivery, cleanup, nil } @@ -119,9 +120,14 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map, return nil, nil, nil, err } - netStore := storage.NewNetStore(localStore, enode.ID{}) - lnetStore := storage.NewLNetStore(netStore) - fileStore := storage.NewFileStore(lnetStore, storage.NewFileStoreParams(), chunk.NewTags()) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + localStore.Close() + localStoreCleanup() + return nil, nil, nil, err + } + + fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags()) kad := network.NewKademlia(addr.Over(), network.NewKadParams()) delivery := NewDelivery(kad, netStore) @@ -161,11 +167,15 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste return nil, nil, nil, nil, err } - netStore := storage.NewNetStore(localStore, enode.ID{}) + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + localStore.Close() + removeDataDir() + return nil, nil, nil, nil, err + } delivery := NewDelivery(to, netStore) - netStore.RemoteGet = delivery.RequestFromPeers - + netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New intervalsStore := state.NewInmemoryStore() streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil) diff --git a/network/stream/delivery.go b/network/stream/delivery.go index 794f9c9f03..43740a0514 100644 --- a/network/stream/delivery.go +++ b/network/stream/delivery.go @@ -27,9 +27,9 @@ import ( "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network" - "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/spancontext" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/tracing" opentracing "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" ) @@ -39,6 +39,9 @@ var ( handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil) retrieveChunkFail = metrics.NewRegisteredCounter("network.stream.retrieve_chunks_fail.count", nil) + requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil) + requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil) + lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) ) @@ -59,42 +62,52 @@ func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery { // RetrieveRequestMsg is the protocol msg for chunk retrieve requests type RetrieveRequestMsg struct { - Addr storage.Address + Addr storage.Address + SkipCheck bool + HopCount uint8 } func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error { - log.Trace("handle retrieve request", "peer", sp.ID(), "hash", req.Addr) + log.Trace("received request", "peer", sp.ID(), "hash", req.Addr) handleRetrieveRequestMsgCount.Inc(1) - ctx, osp := spancontext.StartSpan( + var osp opentracing.Span + ctx, osp = spancontext.StartSpan( ctx, - "handle.retrieve.request") + "stream.handle.retrieve") osp.LogFields(olog.String("ref", req.Addr.String())) - defer osp.Finish() + var cancel func() + // TODO: do something with this hardcoded timeout, maybe use TTL in the future + ctx = context.WithValue(ctx, "peer", sp.ID().String()) + ctx = context.WithValue(ctx, "hopcount", req.HopCount) + ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout) - ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout) - defer cancel() + go func() { + select { + case <-ctx.Done(): + case <-d.quit: + } + cancel() + }() - r := &storage.Request{ - Addr: req.Addr, - Origin: sp.ID(), - } - chunk, err := d.netStore.Get(ctx, chunk.ModeGetRequest, r) - if err != nil { - retrieveChunkFail.Inc(1) - log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "err", err) - return nil - } + go func() { + defer osp.Finish() + ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr) + if err != nil { + retrieveChunkFail.Inc(1) + log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err) + return + } + syncing := false - log.Trace("retrieve request, delivery", "ref", req.Addr, "peer", sp.ID()) - syncing := false - err = sp.Deliver(ctx, chunk, 0, syncing) - if err != nil { - log.Error("sp.Deliver errored", "err", err) - } - osp.LogFields(olog.Bool("delivered", true)) + err = sp.Deliver(ctx, ch, Top, syncing) + if err != nil { + log.Warn("ERROR in handleRetrieveRequestMsg", "err", err) + } + osp.LogFields(olog.Bool("delivered", true)) + }() return nil } @@ -176,166 +189,57 @@ func (d *Delivery) Close() { close(d.quit) } -// getOriginPo returns the originPo if the incoming Request has an Origin -// if our node is the first node that requests this chunk, then we don't have an Origin, -// and return -1 -// this is used only for tracing, and can probably be refactor so that we don't have to -// iterater over Kademlia -func (d *Delivery) getOriginPo(req *storage.Request) int { - originPo := -1 - - d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool { - id := p.ID() - - // get po between chunk and origin - if req.Origin.String() == id.String() { - originPo = po - return false - } - - return true - }) - - return originPo -} - -// FindPeer is returning the closest peer from Kademlia that a chunk -// request hasn't already been sent to -func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, error) { +// RequestFromPeers sends a chunk retrieve request to a peer +// The most eligible peer that hasn't already been sent to is chosen +// TODO: define "eligible" +func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { + requestFromPeersCount.Inc(1) var sp *Peer - var err error + spID := req.Source - osp, _ := ctx.Value("remote.fetch").(opentracing.Span) - - // originPo - proximity of the node that made the request; -1 if the request originator is our node; - // myPo - this node's proximity with the requested chunk - // selectedPeerPo - kademlia suggested node's proximity with the requested chunk (computed further below) - originPo := d.getOriginPo(req) - myPo := chunk.Proximity(req.Addr, d.kad.BaseAddr()) - selectedPeerPo := -1 - - depth := d.kad.NeighbourhoodDepth() - - if osp != nil { - osp.LogFields(olog.Int("originPo", originPo)) - osp.LogFields(olog.Int("depth", depth)) - osp.LogFields(olog.Int("myPo", myPo)) - } - - // do not forward requests if origin proximity is bigger than our node's proximity - // this means that origin is closer to the chunk - if originPo > myPo { - return nil, errors.New("not forwarding request, origin node is closer to chunk than this node") - } - - d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool { - id := p.ID() - - // skip light nodes - if p.LightNode { - return true + if spID != nil { + sp = d.getPeer(*spID) + if sp == nil { + return nil, nil, fmt.Errorf("source peer %v not found", spID.String()) } - - // do not send request back to peer who asked us. maybe merge with SkipPeer at some point - if req.Origin.String() == id.String() { - return true - } - - // skip peers that we have already tried - if req.SkipPeer(id.String()) { - log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String()) - return true - } - - if myPo < depth { // chunk is NOT within the neighbourhood - if po <= myPo { // always choose a peer strictly closer to chunk than us - log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) - return false - } else { - log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + } else { + d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool { + id := p.ID() + if p.LightNode { + // skip light nodes + return true } - } else { // chunk IS WITHIN neighbourhood - if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us - log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) - return false - } else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood - log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) - return false - } else { - log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) + if req.SkipPeer(id.String()) { + log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id) + return true } - } - - // if selected peer is not in the depth (2nd condition; if depth <= po, then peer is in nearest neighbourhood) - // and they have a lower po than ours, return error - if po < myPo && depth > po { - log.Trace("findpeer4 skip peer because origin was closer", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) - - err = fmt.Errorf("not asking peers further away from origin; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo) + sp = d.getPeer(id) + // sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol + if sp == nil { + return true + } + spID = &id return false + }) + if sp == nil { + return nil, nil, errors.New("no peer found") } - - // if chunk falls in our nearest neighbourhood (1st condition), but suggested peer is not in - // the nearest neighbourhood (2nd condition), don't forward the request to suggested peer - if depth <= myPo && depth > po { - log.Trace("findpeer5 skip peer because depth", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String()) - - err = fmt.Errorf("not going outside of depth; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo) - return false - } - - sp = d.getPeer(id) - - // sp could be nil, if we encountered a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol - // if sp is not nil, then we have selected the next peer and we stop iterating - // if sp is nil, we continue iterating - if sp != nil { - selectedPeerPo = po - - return false - } - - // continue iterating - return true - }) - - if osp != nil { - osp.LogFields(olog.Int("selectedPeerPo", selectedPeerPo)) - } - - if err != nil { - return nil, err - } - - if sp == nil { - return nil, errors.New("no peer found") - } - - return sp, nil -} - -// RequestFromPeers sends a chunk retrieve request to the next found peer -func (d *Delivery) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - metrics.GetOrRegisterCounter("delivery.requestfrompeers", nil).Inc(1) - - sp, err := d.FindPeer(ctx, req) - if err != nil { - log.Trace(err.Error()) - return nil, err } // setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip // this span will finish only when delivery is handled (or times out) - r := &RetrieveRequestMsg{ - Addr: req.Addr, - } - log.Trace("sending retrieve request", "ref", r.Addr, "peer", sp.ID().String(), "origin", localID) - err = sp.Send(ctx, r) + ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request") + ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr)) + log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr) + err := sp.SendPriority(ctx, &RetrieveRequestMsg{ + Addr: req.Addr, + SkipCheck: req.SkipCheck, + HopCount: req.HopCount, + }, Top) if err != nil { - log.Error(err.Error()) - return nil, err + return nil, nil, err } + requestFromPeersEachCount.Inc(1) - spID := sp.ID() - return &spID, nil + return spID, sp.quit, nil } diff --git a/network/stream/delivery_test.go b/network/stream/delivery_test.go index 1e4d5a1254..3f124522ca 100644 --- a/network/stream/delivery_test.go +++ b/network/stream/delivery_test.go @@ -19,17 +19,26 @@ package stream import ( "bytes" "context" + "errors" + "fmt" + "sync" "testing" "time" + "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/network" pq "github.com/ethersphere/swarm/network/priorityqueue" + "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" + "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" + "github.com/ethersphere/swarm/testutil" ) //Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet) @@ -151,12 +160,18 @@ func TestRequestFromPeers(t *testing.T) { streamer: r, } r.setPeer(sp) - req := storage.NewRequest(storage.Address(hash0[:])) - id, err := delivery.FindPeer(context.TODO(), req) + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + ctx := context.Background() + id, _, err := delivery.RequestFromPeers(ctx, req) + if err != nil { t.Fatal(err) } - if id.ID() != dummyPeerID { + if *id != dummyPeerID { t.Fatalf("Expected an id, got %v", id) } } @@ -186,10 +201,15 @@ func TestRequestFromPeersWithLightNode(t *testing.T) { } r.setPeer(sp) - req := storage.NewRequest(storage.Address(hash0[:])) + req := network.NewRequest( + storage.Address(hash0[:]), + true, + &sync.Map{}, + ) + ctx := context.Background() // making a request which should return with "no peer found" - _, err := delivery.FindPeer(context.TODO(), req) + _, _, err := delivery.RequestFromPeers(ctx, req) expectedError := "no peer found" if err.Error() != expectedError { @@ -280,3 +300,293 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) { } } + +func TestDeliveryFromNodes(t *testing.T) { + testDeliveryFromNodes(t, 2, dataChunkCount, true) + testDeliveryFromNodes(t, 2, dataChunkCount, false) + testDeliveryFromNodes(t, 4, dataChunkCount, true) + testDeliveryFromNodes(t, 4, dataChunkCount, false) + + if testutil.RaceEnabled { + // Travis cannot handle more nodes with -race; would time out. + return + } + + testDeliveryFromNodes(t, 8, dataChunkCount, true) + testDeliveryFromNodes(t, 8, dataChunkCount, false) + testDeliveryFromNodes(t, 16, dataChunkCount, true) + testDeliveryFromNodes(t, 16, dataChunkCount, false) +} + +func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) { + t.Helper() + t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + }, nil) + bucket.Store(bucketKeyRegistry, r) + + cleanup = func() { + r.Close() + clean() + } + + return r, cleanup, nil + }, + }) + defer sim.Close() + + log.Info("Adding nodes to simulation") + _, err := sim.AddNodesAndConnectChain(nodes) + if err != nil { + t.Fatal(err) + } + + log.Info("Starting simulation") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + //determine the pivot node to be the first node of the simulation + pivot := nodeIDs[0] + + //distribute chunks of a random file into Stores of nodes 1 to nodes + //we will do this by creating a file store with an underlying round-robin store: + //the file store will create a hash for the uploaded file, but every chunk will be + //distributed to different nodes via round-robin scheduling + log.Debug("Writing file to round-robin file store") + //to do this, we create an array for chunkstores (length minus one, the pivot node) + stores := make([]storage.ChunkStore, len(nodeIDs)-1) + //we then need to get all stores from the sim.... + lStores := sim.NodesItems(bucketKeyStore) + i := 0 + //...iterate the buckets... + for id, bucketVal := range lStores { + //...and remove the one which is the pivot node + if id == pivot { + continue + } + //the other ones are added to the array... + stores[i] = bucketVal.(storage.ChunkStore) + i++ + } + //...which then gets passed to the round-robin file store + roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags()) + //now we can actually upload a (random) file to the round-robin store + size := chunkCount * chunkSize + log.Debug("Storing data to file store") + fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false) + // wait until all chunks stored + if err != nil { + return err + } + err = wait(ctx) + if err != nil { + return err + } + + //get the pivot node's filestore + item, ok := sim.NodeItem(pivot, bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + pivotFileStore := item.(*storage.FileStore) + log.Debug("Starting retrieval routine") + retErrC := make(chan error) + go func() { + // start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks + // we must wait for the peer connections to have started before requesting + n, err := readAll(pivotFileStore, fileHash) + log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err) + retErrC <- err + }() + + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") + } + }() + + //finally check that the pivot node gets all chunks via the root hash + log.Debug("Check retrieval") + success := true + var total int64 + total, err = readAll(pivotFileStore, fileHash) + if err != nil { + return err + } + log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err)) + if err != nil || total != int64(size) { + success = false + } + + if !success { + return fmt.Errorf("Test failed, chunks not available on all nodes") + } + if err := <-retErrC; err != nil { + return fmt.Errorf("requesting chunks: %v", err) + } + log.Debug("Test terminated successfully") + return nil + }) + if result.Error != nil { + t.Fatal(result.Error) + } + }) +} + +func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) { + for chunks := 32; chunks <= 128; chunks *= 2 { + for i := 2; i < 32; i *= 2 { + b.Run( + fmt.Sprintf("nodes=%v,chunks=%v", i, chunks), + func(b *testing.B) { + benchmarkDeliveryFromNodes(b, i, chunks, true) + }, + ) + } + } +} + +func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) { + for chunks := 32; chunks <= 128; chunks *= 2 { + for i := 2; i < 32; i *= 2 { + b.Run( + fmt.Sprintf("nodes=%v,chunks=%v", i, chunks), + func(b *testing.B) { + benchmarkDeliveryFromNodes(b, i, chunks, false) + }, + ) + } + } +} + +func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) { + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket) + if err != nil { + return nil, nil, err + } + + r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{ + SkipCheck: skipCheck, + Syncing: SyncingDisabled, + SyncUpdateDelay: 0, + }, nil) + bucket.Store(bucketKeyRegistry, r) + + cleanup = func() { + r.Close() + clean() + } + + return r, cleanup, nil + }, + }) + defer sim.Close() + + log.Info("Initializing test config") + _, err := sim.AddNodesAndConnectChain(nodes) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + node := nodeIDs[len(nodeIDs)-1] + + item, ok := sim.NodeItem(node, bucketKeyFileStore) + if !ok { + return errors.New("No filestore") + } + remoteFileStore := item.(*storage.FileStore) + + pivotNode := nodeIDs[0] + item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore) + if !ok { + return errors.New("No filestore") + } + netStore := item.(*storage.NetStore) + + if _, err := sim.WaitTillHealthy(ctx); err != nil { + return err + } + + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") + } + }() + // benchmark loop + b.ResetTimer() + b.StopTimer() + Loop: + for i := 0; i < b.N; i++ { + // uploading chunkCount random chunks to the last node + hashes := make([]storage.Address, chunkCount) + for i := 0; i < chunkCount; i++ { + // create actual size real chunks + ctx := context.TODO() + hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false) + if err != nil { + return fmt.Errorf("store: %v", err) + } + // wait until all chunks stored + err = wait(ctx) + if err != nil { + return fmt.Errorf("wait store: %v", err) + } + // collect the hashes + hashes[i] = hash + } + // now benchmark the actual retrieval + // netstore.Get is called for each hash in a go routine and errors are collected + b.StartTimer() + errs := make(chan error) + for _, hash := range hashes { + go func(h storage.Address) { + _, err := netStore.Get(ctx, chunk.ModeGetRequest, h) + log.Warn("test check netstore get", "hash", h, "err", err) + errs <- err + }(hash) + } + // count and report retrieval errors + // if there are misses then chunk timeout is too low for the distance and volume (?) + var total, misses int + for err := range errs { + if err != nil { + log.Warn(err.Error()) + misses++ + } + total++ + if total == chunkCount { + break + } + } + b.StopTimer() + + if misses > 0 { + err = fmt.Errorf("%v chunk not found out of %v", misses, total) + break Loop + } + } + return err + }) + if result.Error != nil { + b.Fatal(result.Error) + } + +} diff --git a/network/stream/intervals_test.go b/network/stream/intervals_test.go index 49972e54e6..2479adbf66 100644 --- a/network/stream/intervals_test.go +++ b/network/stream/intervals_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethersphere/swarm/network/simulation" - "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/testutil" @@ -76,7 +75,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) { return newTestExternalClient(netStore), nil }) r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) { - return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys), nil + return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil }) cleanup := func() { @@ -299,42 +298,38 @@ func newTestExternalClient(netStore *storage.NetStore) *testExternalClient { } } -func (c *testExternalClient) NeedData(ctx context.Context, key []byte) (bool, func(context.Context) error) { - fi, loaded, ok := c.netStore.GetOrCreateFetcher(ctx, key, "syncer") - if !ok { - return loaded, nil +func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { + wait := c.netStore.FetchFunc(ctx, storage.Address(hash)) + if wait == nil { + return nil } - select { - case c.hashes <- key: + case c.hashes <- hash: case <-ctx.Done(): log.Warn("testExternalClient NeedData context", "err", ctx.Err()) - return false, func(_ context.Context) error { + return func(_ context.Context) error { return ctx.Err() } } - - return loaded, func(ctx context.Context) error { - select { - case <-fi.Delivered: - case <-time.After(timeouts.SyncerClientWaitTimeout): - return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key)) - } - return nil - } + return wait } func (c *testExternalClient) Close() {} type testExternalServer struct { t string + keyFunc func(key []byte, index uint64) sessionAt uint64 maxKeys uint64 } -func newTestExternalServer(t string, sessionAt, maxKeys uint64) *testExternalServer { +func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer { + if keyFunc == nil { + keyFunc = binary.BigEndian.PutUint64 + } return &testExternalServer{ t: t, + keyFunc: keyFunc, sessionAt: sessionAt, maxKeys: maxKeys, } @@ -350,7 +345,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6 } b := make([]byte, HashSize*(to-from+1)) for i := from; i <= to; i++ { - binary.BigEndian.PutUint64(b[(i-from)*HashSize:(i-from+1)*HashSize], i) + s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) } return b, from, to, nil } diff --git a/network/stream/messages.go b/network/stream/messages.go index 33320db289..cb51229b80 100644 --- a/network/stream/messages.go +++ b/network/stream/messages.go @@ -225,14 +225,9 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg for i := 0; i < lenHashes; i += HashSize { hash := hashes[i : i+HashSize] - log.Trace("checking offered hash", "ref", fmt.Sprintf("%x", hash)) - - if _, wait := c.NeedData(ctx, hash); wait != nil { + if wait := c.NeedData(ctx, hash); wait != nil { ctr++ - - // set the bit, so create a request want.Set(i/HashSize, true) - log.Trace("need data", "ref", fmt.Sprintf("%x", hash), "request", true) // measure how long it takes before we mark chunks for retrieval, and actually send the request if !wantDelaySet { diff --git a/network/stream/snapshot_sync_test.go b/network/stream/snapshot_sync_test.go index 39d5920273..a60d6bcfde 100644 --- a/network/stream/snapshot_sync_test.go +++ b/network/stream/snapshot_sync_test.go @@ -63,8 +63,8 @@ const ( // Tests in this file should not request chunks from peers. // This function will panic indicating that there is a problem if request has been made. -func dummyRequestFromPeers(_ context.Context, req *storage.Request, _ enode.ID) (*enode.ID, error) { - panic(fmt.Sprintf("unexpected request: address %s", req.Addr.String())) +func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) { + panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String())) } //This test is a syncing test for nodes. diff --git a/network/stream/stream.go b/network/stream/stream.go index 3465acae75..6c3dbd0152 100644 --- a/network/stream/stream.go +++ b/network/stream/stream.go @@ -543,7 +543,7 @@ func (c *client) NextInterval() (start, end uint64, err error) { // Client interface for incoming peer Streamer type Client interface { - NeedData(context.Context, []byte) (bool, func(context.Context) error) + NeedData(context.Context, []byte) func(context.Context) error Close() } diff --git a/network/stream/streamer_test.go b/network/stream/streamer_test.go index 4e01056204..c631900539 100644 --- a/network/stream/streamer_test.go +++ b/network/stream/streamer_test.go @@ -93,20 +93,20 @@ func newTestClient(t string) *testClient { } } -func (self *testClient) NeedData(ctx context.Context, hash []byte) (bool, func(context.Context) error) { +func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { - return false, func(context.Context) error { + return func(context.Context) error { <-self.wait0 return nil } } else if bytes.Equal(hash, hash2[:]) { - return false, func(context.Context) error { + return func(context.Context) error { <-self.wait2 return nil } } - return false, nil + return nil } func (self *testClient) Close() {} diff --git a/network/stream/syncer.go b/network/stream/syncer.go index 6fc1202ad3..f73d959ce8 100644 --- a/network/stream/syncer.go +++ b/network/stream/syncer.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" - "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/storage" ) @@ -74,7 +73,7 @@ func (s *SwarmSyncerServer) Close() { // GetData retrieves the actual chunk from netstore func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) { - ch, err := s.netStore.Store.Get(ctx, chunk.ModeGetSync, storage.Address(key)) + ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key)) if err != nil { return nil, err } @@ -199,24 +198,9 @@ func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) { }) } -func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (loaded bool, wait func(context.Context) error) { - start := time.Now() - - fi, loaded, ok := s.netStore.GetOrCreateFetcher(ctx, key, "syncer") - if !ok { - return loaded, nil - } - - return loaded, func(ctx context.Context) error { - select { - case <-fi.Delivered: - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("fetcher.%s.syncer", fi.CreatedBy), nil).UpdateSince(start) - case <-time.After(timeouts.SyncerClientWaitTimeout): - metrics.GetOrRegisterCounter("fetcher.syncer.timeout", nil).Inc(1) - return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key)) - } - return nil - } +// NeedData +func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) { + return s.netStore.FetchFunc(ctx, key) } func (s *SwarmSyncerClient) Close() {} diff --git a/network/timeouts/timeouts.go b/network/timeouts/timeouts.go deleted file mode 100644 index b92af3b47b..0000000000 --- a/network/timeouts/timeouts.go +++ /dev/null @@ -1,24 +0,0 @@ -package timeouts - -import "time" - -// FailedPeerSkipDelay is the time we consider a peer to be skipped for a particular request/chunk, -// because this peer failed to deliver it during the SearchTimeout interval -var FailedPeerSkipDelay = 20 * time.Second - -// FetcherGlobalTimeout is the max time a node tries to find a chunk for a client, after which it returns a 404 -// Basically this is the amount of time a singleflight request for a given chunk lives -var FetcherGlobalTimeout = 10 * time.Second - -// SearchTimeout is the max time requests wait for a peer to deliver a chunk, after which another peer is tried -var SearchTimeout = 500 * time.Millisecond - -// SyncerClientWaitTimeout is the max time a syncer client waits for a chunk to be delivered during syncing -var SyncerClientWaitTimeout = 20 * time.Second - -// Within handleOfferedHashesMsg - how long to wait for a given batch of chunks to be delivered by the peer offering them -var SyncBatchTimeout = 10 * time.Second - -// Within SwarmSyncerServer - If at least one chunk is added to the batch and no new chunks -// are added in BatchTimeout period, the batch will be returned. -var BatchTimeout = 2 * time.Second diff --git a/storage/feed/handler.go b/storage/feed/handler.go index cbaaac1819..5b4bb60507 100644 --- a/storage/feed/handler.go +++ b/storage/feed/handler.go @@ -192,13 +192,12 @@ func (h *Handler) Lookup(ctx context.Context, query *Query) (*cacheEntry, error) ctx, cancel := context.WithTimeout(ctx, defaultRetrieveTimeout) defer cancel() - r := storage.NewRequest(id.Addr()) - ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, r) + ch, err := h.chunkStore.Get(ctx, chunk.ModeGetLookup, id.Addr()) if err != nil { - if err == context.DeadlineExceeded || err == storage.ErrNoSuitablePeer { // chunk not found + if err == context.DeadlineExceeded { // chunk not found return nil, nil } - return nil, err + return nil, err //something else happened or context was cancelled. } var request Request diff --git a/storage/feed/lookup/lookup.go b/storage/feed/lookup/lookup.go index 9e9fa56588..4b233a0e07 100644 --- a/storage/feed/lookup/lookup.go +++ b/storage/feed/lookup/lookup.go @@ -58,8 +58,6 @@ var TimeAfter = time.After // It should return if a value is found, but its timestamp is higher than "now" // It should only return an error in case the handler wants to stop the // lookup process entirely. -// If the context is canceled, it must return context.Canceled - type ReadFunc func(ctx context.Context, epoch Epoch, now uint64) (interface{}, error) // NoClue is a hint that can be provided when the Lookup caller does not have diff --git a/storage/feed/testutil.go b/storage/feed/testutil.go index a1e9cf030c..7e2a73f85c 100644 --- a/storage/feed/testutil.go +++ b/storage/feed/testutil.go @@ -18,8 +18,8 @@ package feed import ( "context" - "errors" "path/filepath" + "sync" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethersphere/swarm/chunk" @@ -39,6 +39,17 @@ func (t *TestHandler) Close() { t.chunkStore.Close() } +type mockNetFetcher struct{} + +func (m *mockNetFetcher) Request(hopCount uint8) { +} +func (m *mockNetFetcher) Offer(source *enode.ID) { +} + +func newFakeNetFetcher(context.Context, storage.Address, *sync.Map) storage.NetFetcher { + return &mockNetFetcher{} +} + // NewTestHandler creates Handler object to be used for testing purposes. func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) { path := filepath.Join(datadir, testDbDirName) @@ -51,10 +62,11 @@ func NewTestHandler(datadir string, params *HandlerParams) (*TestHandler, error) localStore := chunk.NewValidatorStore(db, storage.NewContentAddressValidator(storage.MakeHashFunc(feedsHashAlgorithm)), fh) - netStore := storage.NewNetStore(localStore, enode.ID{}) - netStore.RemoteGet = func(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) { - return nil, errors.New("not found") + netStore, err := storage.NewNetStore(localStore, nil) + if err != nil { + return nil, err } + netStore.NewNetFetcherFunc = newFakeNetFetcher fh.SetStore(netStore) return &TestHandler{fh}, nil } diff --git a/storage/filestore.go b/storage/filestore.go index ff5ffe3970..2c2a625a18 100644 --- a/storage/filestore.go +++ b/storage/filestore.go @@ -39,8 +39,9 @@ implementation for storage or retrieval. */ const ( - defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB - defaultCacheCapacity = 10000 // capacity for in-memory chunks' cache + defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB + defaultCacheCapacity = 10000 // capacity for in-memory chunks' cache + defaultChunkRequestsCacheCapacity = 5000000 // capacity for container holding outgoing requests for chunks. should be set to LevelDB capacity ) type FileStore struct { diff --git a/storage/lnetstore.go b/storage/lnetstore.go deleted file mode 100644 index f13c99b5fa..0000000000 --- a/storage/lnetstore.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package storage - -import ( - "context" - - "github.com/ethersphere/swarm/chunk" - "github.com/ethersphere/swarm/network/timeouts" -) - -// LNetStore is a wrapper of NetStore, which implements the chunk.Store interface. It is used only by the FileStore, -// the component used by the Swarm API to store and retrieve content and to split and join chunks. -type LNetStore struct { - *NetStore -} - -// NewLNetStore is a constructor for LNetStore -func NewLNetStore(store *NetStore) *LNetStore { - return &LNetStore{ - NetStore: store, - } -} - -// Get converts a chunk reference to a chunk Request (with empty Origin), handled by the NetStore, and -// returns the requested chunk, or error. -func (n *LNetStore) Get(ctx context.Context, mode chunk.ModeGet, ref Address) (ch Chunk, err error) { - ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout) - defer cancel() - - return n.NetStore.Get(ctx, mode, NewRequest(ref)) -} diff --git a/storage/netstore.go b/storage/netstore.go index 8f92d7aefd..ac4a33d4b9 100644 --- a/storage/netstore.go +++ b/storage/netstore.go @@ -18,291 +18,318 @@ package storage import ( "context" - "errors" + "encoding/hex" "fmt" "sync" + "sync/atomic" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/log" - "github.com/ethersphere/swarm/network/timeouts" "github.com/ethersphere/swarm/spancontext" - lru "github.com/hashicorp/golang-lru" - - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/opentracing/opentracing-go" olog "github.com/opentracing/opentracing-go/log" "github.com/syndtr/goleveldb/leveldb" - "golang.org/x/sync/singleflight" + + lru "github.com/hashicorp/golang-lru" ) -const ( - // capacity for the fetchers LRU cache - fetchersCapacity = 500000 +type ( + NewNetFetcherFunc func(ctx context.Context, addr Address, peers *sync.Map) NetFetcher ) -var ( - ErrNoSuitablePeer = errors.New("no suitable peer") -) - -// Fetcher is a struct which maintains state of remote requests. -// Fetchers are stored in fetchers map and signal to all interested parties if a given chunk is delivered -// the mutex controls who closes the channel, and make sure we close the channel only once -type Fetcher struct { - Delivered chan struct{} // when closed, it means that the chunk this Fetcher refers to is delivered - - // it is possible for multiple actors to be delivering the same chunk, - // for example through syncing and through retrieve request. however we want the `Delivered` channel to be closed only - // once, even if we put the same chunk multiple times in the NetStore. - once sync.Once - - CreatedAt time.Time // timestamp when the fetcher was created, used for metrics measuring lifetime of fetchers - CreatedBy string // who created the fetcher - "request" or "syncing", used for metrics measuring lifecycle of fetchers - - RequestedBySyncer bool // whether we have issued at least once a request through Offered/Wanted hashes flow +type NetFetcher interface { + Request(hopCount uint8) + Offer(source *enode.ID) } -// NewFetcher is a constructor for a Fetcher -func NewFetcher() *Fetcher { - return &Fetcher{make(chan struct{}), sync.Once{}, time.Now(), "", false} -} - -// SafeClose signals to interested parties (those waiting for a signal on fi.Delivered) that a chunk is delivered. -// It closes the fi.Delivered channel through the sync.Once object, because it is possible for a chunk to be -// delivered multiple times concurrently. -func (fi *Fetcher) SafeClose() { - fi.once.Do(func() { - close(fi.Delivered) - }) -} - -type RemoteGetFunc func(ctx context.Context, req *Request, localID enode.ID) (*enode.ID, error) - -// NetStore is an extension of LocalStore +// NetStore is an extension of local storage // it implements the ChunkStore interface -// on request it initiates remote cloud retrieval +// on request it initiates remote cloud retrieval using a fetcher +// fetchers are unique to a chunk and are stored in fetchers LRU memory cache +// fetchFuncFactory is a factory object to create a fetch function for a specific chunk address type NetStore struct { chunk.Store - localID enode.ID // our local enode - used when issuing RetrieveRequests - fetchers *lru.Cache - putMu sync.Mutex - requestGroup singleflight.Group - RemoteGet RemoteGetFunc + mu sync.Mutex + fetchers *lru.Cache + NewNetFetcherFunc NewNetFetcherFunc + closeC chan struct{} } -// NewNetStore creates a new NetStore using the provided chunk.Store and localID of the node. -func NewNetStore(store chunk.Store, localID enode.ID) *NetStore { - fetchers, _ := lru.New(fetchersCapacity) +var fetcherTimeout = 2 * time.Minute // timeout to cancel the fetcher even if requests are coming in - return &NetStore{ - fetchers: fetchers, - Store: store, - localID: localID, +// NewNetStore creates a new NetStore object using the given local store. newFetchFunc is a +// constructor function that can create a fetch function for a specific chunk address. +func NewNetStore(store chunk.Store, nnf NewNetFetcherFunc) (*NetStore, error) { + fetchers, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + return nil, err } + return &NetStore{ + Store: store, + fetchers: fetchers, + NewNetFetcherFunc: nnf, + closeC: make(chan struct{}), + }, nil } // Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in // the fetchers cache func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, error) { - n.putMu.Lock() - defer n.putMu.Unlock() + n.mu.Lock() + defer n.mu.Unlock() - log.Trace("netstore.put", "ref", ch.Address().String(), "mode", mode) - - // put the chunk to the localstore, there should be no error + // put to the chunk to the store, there should be no error exists, err := n.Store.Put(ctx, mode, ch) if err != nil { return exists, err } - // notify RemoteGet (or SwarmSyncerClient) about a chunk delivery and it being stored - fi, ok := n.fetchers.Get(ch.Address().String()) - if ok { - // we need SafeClose, because it is possible for a chunk to both be - // delivered through syncing and through a retrieve request - fii := fi.(*Fetcher) - fii.SafeClose() - log.Trace("netstore.put chunk delivered and stored", "ref", ch.Address().String()) - - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("netstore.fetcher.lifetime.%s", fii.CreatedBy), nil).UpdateSince(fii.CreatedAt) - - // helper snippet to log if a chunk took way to long to be delivered - slowChunkDeliveryThreshold := 5 * time.Second - if time.Since(fii.CreatedAt) > slowChunkDeliveryThreshold { - log.Trace("netstore.put slow chunk delivery", "ref", ch.Address().String()) - } - - n.fetchers.Remove(ch.Address().String()) + // if chunk is now put in the store, check if there was an active fetcher and call deliver on it + // (this delivers the chunk to requestors via the fetcher) + log.Trace("n.getFetcher", "ref", ch.Address()) + if f := n.getFetcher(ch.Address()); f != nil { + log.Trace("n.getFetcher deliver", "ref", ch.Address()) + f.deliver(ctx, ch) } - return exists, nil } +// Get retrieves the chunk from the NetStore DPA synchronously. +// It calls NetStore.get, and if the chunk is not in local Storage +// it calls fetch with the request, which blocks until the chunk +// arrived or context is done +func (n *NetStore) Get(rctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, error) { + chunk, fetch, err := n.get(rctx, mode, ref) + if err != nil { + return nil, err + } + if chunk != nil { + // this is not measuring how long it takes to get the chunk for the localstore, but + // rather just adding a span for clarity when inspecting traces in Jaeger, in order + // to make it easier to reason which is the node that actually delivered a chunk. + _, sp := spancontext.StartSpan( + rctx, + "localstore.get") + defer sp.Finish() + + return chunk, nil + } + return fetch(rctx) +} + +// FetchFunc returns nil if the store contains the given address. Otherwise it returns a wait function, +// which returns after the chunk is available or the context is done +func (n *NetStore) FetchFunc(ctx context.Context, ref Address) func(context.Context) error { + chunk, fetch, _ := n.get(ctx, chunk.ModeGetRequest, ref) + if chunk != nil { + return nil + } + return func(ctx context.Context) error { + _, err := fetch(ctx) + return err + } +} + // Close chunk store -func (n *NetStore) Close() error { +func (n *NetStore) Close() (err error) { + close(n.closeC) + + wg := sync.WaitGroup{} + for _, key := range n.fetchers.Keys() { + if f, ok := n.fetchers.Get(key); ok { + if fetch, ok := f.(*fetcher); ok { + wg.Add(1) + go func(fetch *fetcher) { + defer wg.Done() + fetch.cancel() + + select { + case <-fetch.deliveredC: + case <-fetch.cancelledC: + } + }(fetch) + } + } + } + wg.Wait() + return n.Store.Close() } -// Get retrieves a chunk -// If it is not found in the LocalStore then it uses RemoteGet to fetch from the network. -func (n *NetStore) Get(ctx context.Context, mode chunk.ModeGet, req *Request) (Chunk, error) { - metrics.GetOrRegisterCounter("netstore.get", nil).Inc(1) - start := time.Now() +// get attempts at retrieving the chunk from LocalStore +// If it is not found then using getOrCreateFetcher: +// 1. Either there is already a fetcher to retrieve it +// 2. A new fetcher is created and saved in the fetchers cache +// From here on, all Get will hit on this fetcher until the chunk is delivered +// or all fetcher contexts are done. +// It returns a chunk, a fetcher function and an error +// If chunk is nil, the returned fetch function needs to be called with a context to return the chunk. +func (n *NetStore) get(ctx context.Context, mode chunk.ModeGet, ref Address) (Chunk, func(context.Context) (Chunk, error), error) { + n.mu.Lock() + defer n.mu.Unlock() - ref := req.Addr - - log.Trace("netstore.get", "ref", ref.String()) - - ch, err := n.Store.Get(ctx, mode, ref) + chunk, err := n.Store.Get(ctx, mode, ref) if err != nil { - // TODO: fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped. + // TODO: Fix comparison - we should be comparing against leveldb.ErrNotFound, this error should be wrapped. if err != ErrChunkNotFound && err != leveldb.ErrNotFound { - log.Error("localstore get error", "err", err) + log.Debug("Received error from LocalStore other than ErrNotFound", "err", err) } + // The chunk is not available in the LocalStore, let's get the fetcher for it, or create a new one + // if it doesn't exist yet + f := n.getOrCreateFetcher(ctx, ref) + // If the caller needs the chunk, it has to use the returned fetch function to get it + return nil, f.Fetch, nil + } - log.Trace("netstore.chunk-not-in-localstore", "ref", ref.String()) + return chunk, nil, nil +} - v, err, _ := n.requestGroup.Do(ref.String(), func() (interface{}, error) { - // currently we issue a retrieve request if a fetcher - // has already been created by a syncer for that particular chunk. - // so it is possible to - // have 2 in-flight requests for the same chunk - one by a - // syncer (offered/wanted/deliver flow) and one from - // here - retrieve request - fi, _, ok := n.GetOrCreateFetcher(ctx, ref, "request") - if ok { - err := n.RemoteFetch(ctx, req, fi) - if err != nil { - return nil, err - } - } +// getOrCreateFetcher attempts at retrieving an existing fetchers +// if none exists, creates one and saves it in the fetchers cache +// caller must hold the lock +func (n *NetStore) getOrCreateFetcher(ctx context.Context, ref Address) *fetcher { + if f := n.getFetcher(ref); f != nil { + return f + } - ch, err := n.Store.Get(ctx, mode, ref) - if err != nil { - log.Error(err.Error(), "ref", ref) - return nil, errors.New("item should have been in localstore, but it is not") - } + // no fetcher for the given address, we have to create a new one + key := hex.EncodeToString(ref) + // create the context during which fetching is kept alive + cctx, cancel := context.WithTimeout(ctx, fetcherTimeout) + // destroy is called when all requests finish + destroy := func() { + // remove fetcher from fetchers + n.fetchers.Remove(key) + // stop fetcher by cancelling context called when + // all requests cancelled/timedout or chunk is delivered + cancel() + } + // peers always stores all the peers which have an active request for the chunk. It is shared + // between fetcher and the NewFetchFunc function. It is needed by the NewFetchFunc because + // the peers which requested the chunk should not be requested to deliver it. + peers := &sync.Map{} - // fi could be nil (when ok == false) if the chunk was added to the NetStore between n.store.Get and the call to n.GetOrCreateFetcher - if fi != nil { - metrics.GetOrRegisterResettingTimer(fmt.Sprintf("fetcher.%s.request", fi.CreatedBy), nil).UpdateSince(start) - } + cctx, sp := spancontext.StartSpan( + cctx, + "netstore.fetcher", + ) - return ch, nil - }) + sp.LogFields(olog.String("ref", ref.String())) + fetcher := newFetcher(sp, ref, n.NewNetFetcherFunc(cctx, ref, peers), destroy, peers, n.closeC) + n.fetchers.Add(key, fetcher) - if err != nil { - log.Trace(err.Error(), "ref", ref) + return fetcher +} + +// getFetcher retrieves the fetcher for the given address from the fetchers cache if it exists, +// otherwise it returns nil +func (n *NetStore) getFetcher(ref Address) *fetcher { + key := hex.EncodeToString(ref) + f, ok := n.fetchers.Get(key) + if ok { + return f.(*fetcher) + } + return nil +} + +// RequestsCacheLen returns the current number of outgoing requests stored in the cache +func (n *NetStore) RequestsCacheLen() int { + return n.fetchers.Len() +} + +// One fetcher object is responsible to fetch one chunk for one address, and keep track of all the +// peers who have requested it and did not receive it yet. +type fetcher struct { + addr Address // address of chunk + chunk Chunk // fetcher can set the chunk on the fetcher + deliveredC chan struct{} // chan signalling chunk delivery to requests + cancelledC chan struct{} // chan signalling the fetcher has been cancelled (removed from fetchers in NetStore) + netFetcher NetFetcher // remote fetch function to be called with a request source taken from the context + cancel func() // cleanup function for the remote fetcher to call when all upstream contexts are called + peers *sync.Map // the peers which asked for the chunk + requestCnt int32 // number of requests on this chunk. If all the requests are done (delivered or context is done) the cancel function is called + deliverOnce *sync.Once // guarantees that we only close deliveredC once + span opentracing.Span // measure retrieve time per chunk +} + +// newFetcher creates a new fetcher object for the fiven addr. fetch is the function which actually +// does the retrieval (in non-test cases this is coming from the network package). cancel function is +// called either +// 1. when the chunk has been fetched all peers have been either notified or their context has been done +// 2. the chunk has not been fetched but all context from all the requests has been done +// The peers map stores all the peers which have requested chunk. +func newFetcher(span opentracing.Span, addr Address, nf NetFetcher, cancel func(), peers *sync.Map, closeC chan struct{}) *fetcher { + cancelOnce := &sync.Once{} // cancel should only be called once + return &fetcher{ + addr: addr, + deliveredC: make(chan struct{}), + deliverOnce: &sync.Once{}, + cancelledC: closeC, + netFetcher: nf, + cancel: func() { + cancelOnce.Do(func() { + cancel() + }) + }, + peers: peers, + span: span, + } +} + +// Fetch fetches the chunk synchronously, it is called by NetStore.Get is the chunk is not available +// locally. +func (f *fetcher) Fetch(rctx context.Context) (Chunk, error) { + atomic.AddInt32(&f.requestCnt, 1) + defer func() { + // if all the requests are done the fetcher can be cancelled + if atomic.AddInt32(&f.requestCnt, -1) == 0 { + f.cancel() + } + f.span.Finish() + }() + + // The peer asking for the chunk. Store in the shared peers map, but delete after the request + // has been delivered + peer := rctx.Value("peer") + if peer != nil { + f.peers.Store(peer, time.Now()) + defer f.peers.Delete(peer) + } + + // If there is a source in the context then it is an offer, otherwise a request + sourceIF := rctx.Value("source") + + hopCount, _ := rctx.Value("hopcount").(uint8) + + if sourceIF != nil { + var source enode.ID + if err := source.UnmarshalText([]byte(sourceIF.(string))); err != nil { return nil, err } - - c := v.(Chunk) - - log.Trace("netstore.singleflight returned", "ref", ref.String(), "err", err) - - return c, nil - } - - ctx, ssp := spancontext.StartSpan( - ctx, - "localstore.get") - defer ssp.Finish() - - return ch, nil -} - -// RemoteFetch is handling the retry mechanism when making a chunk request to our peers. -// For a given chunk Request, we call RemoteGet, which selects the next eligible peer and -// issues a RetrieveRequest and we wait for a delivery. If a delivery doesn't arrive within the SearchTimeout -// we retry. -func (n *NetStore) RemoteFetch(ctx context.Context, req *Request, fi *Fetcher) error { - // while we haven't timed-out, and while we don't have a chunk, - // iterate over peers and try to find a chunk - metrics.GetOrRegisterCounter("remote.fetch", nil).Inc(1) - - ref := req.Addr - - for { - metrics.GetOrRegisterCounter("remote.fetch.inner", nil).Inc(1) - - ctx, osp := spancontext.StartSpan( - ctx, - "remote.fetch") - osp.LogFields(olog.String("ref", ref.String())) - - log.Trace("remote.fetch", "ref", ref) - - currentPeer, err := n.RemoteGet(ctx, req, n.localID) - if err != nil { - log.Trace(err.Error(), "ref", ref) - osp.LogFields(olog.String("err", err.Error())) - osp.Finish() - return ErrNoSuitablePeer - } - - // add peer to the set of peers to skip from now - log.Trace("remote.fetch, adding peer to skip", "ref", ref, "peer", currentPeer.String()) - req.PeersToSkip.Store(currentPeer.String(), time.Now()) - - select { - case <-fi.Delivered: - log.Trace("remote.fetch, chunk delivered", "ref", ref) - - osp.LogFields(olog.Bool("delivered", true)) - osp.Finish() - return nil - case <-time.After(timeouts.SearchTimeout): - metrics.GetOrRegisterCounter("remote.fetch.timeout.search", nil).Inc(1) - - osp.LogFields(olog.Bool("timeout", true)) - osp.Finish() - break - case <-ctx.Done(): // global fetcher timeout - log.Trace("remote.fetch, fail", "ref", ref) - metrics.GetOrRegisterCounter("remote.fetch.timeout.global", nil).Inc(1) - - osp.LogFields(olog.Bool("fail", true)) - osp.Finish() - return ctx.Err() - } - } -} - -// Has is the storage layer entry point to query the underlying -// database to return if it has a chunk or not. -func (n *NetStore) Has(ctx context.Context, ref Address) (bool, error) { - return n.Store.Has(ctx, ref) -} - -// GetOrCreateFetcher returns the Fetcher for a given chunk, if this chunk is not in the LocalStore. -// If the chunk is in the LocalStore, it returns nil for the Fetcher and ok == false -func (n *NetStore) GetOrCreateFetcher(ctx context.Context, ref Address, interestedParty string) (f *Fetcher, loaded bool, ok bool) { - n.putMu.Lock() - defer n.putMu.Unlock() - - has, err := n.Store.Has(ctx, ref) - if err != nil { - log.Error(err.Error()) - } - if has { - return nil, false, false - } - - f = NewFetcher() - v, loaded := n.fetchers.Get(ref.String()) - log.Trace("netstore.has-with-callback.loadorstore", "ref", ref.String(), "loaded", loaded) - if loaded { - f = v.(*Fetcher) + f.netFetcher.Offer(&source) } else { - f.CreatedBy = interestedParty - n.fetchers.Add(ref.String(), f) + f.netFetcher.Request(hopCount) } - // if fetcher created by request, but we get a call from syncer, make sure we issue a second request - if f.CreatedBy != interestedParty && !f.RequestedBySyncer { - f.RequestedBySyncer = true - return f, false, true + // wait until either the chunk is delivered or the context is done + select { + case <-rctx.Done(): + return nil, rctx.Err() + case <-f.deliveredC: + return f.chunk, nil + case <-f.cancelledC: + return nil, fmt.Errorf("fetcher cancelled") } - - return f, loaded, true +} + +// deliver is called by NetStore.Put to notify all pending requests +func (f *fetcher) deliver(ctx context.Context, ch Chunk) { + f.deliverOnce.Do(func() { + f.chunk = ch + // closing the deliveredC channel will terminate ongoing requests + close(f.deliveredC) + log.Trace("n.getFetcher close deliveredC", "ref", ch.Address()) + }) } diff --git a/storage/netstore_test.go b/storage/netstore_test.go new file mode 100644 index 0000000000..41f37eb398 --- /dev/null +++ b/storage/netstore_test.go @@ -0,0 +1,702 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package storage + +import ( + "bytes" + "context" + "crypto/rand" + "errors" + "fmt" + "io/ioutil" + "os" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/storage/localstore" +) + +var sourcePeerID = enode.HexID("99d8594b52298567d2ca3f4c441a5ba0140ee9245e26460d01102a52773c73b9") + +type mockNetFetcher struct { + peers *sync.Map + sources []*enode.ID + peersPerRequest [][]Address + requestCalled bool + offerCalled bool + quit <-chan struct{} + ctx context.Context + hopCounts []uint8 + mu sync.Mutex +} + +func (m *mockNetFetcher) Offer(source *enode.ID) { + m.offerCalled = true + m.sources = append(m.sources, source) +} + +func (m *mockNetFetcher) Request(hopCount uint8) { + m.mu.Lock() + defer m.mu.Unlock() + + m.requestCalled = true + var peers []Address + m.peers.Range(func(key interface{}, _ interface{}) bool { + peers = append(peers, common.FromHex(key.(string))) + return true + }) + m.peersPerRequest = append(m.peersPerRequest, peers) + m.hopCounts = append(m.hopCounts, hopCount) +} + +type mockNetFetchFuncFactory struct { + fetcher *mockNetFetcher +} + +func (m *mockNetFetchFuncFactory) newMockNetFetcher(ctx context.Context, _ Address, peers *sync.Map) NetFetcher { + m.fetcher.peers = peers + m.fetcher.quit = ctx.Done() + m.fetcher.ctx = ctx + return m.fetcher +} + +func newTestNetStore(t *testing.T) (netStore *NetStore, fetcher *mockNetFetcher, cleanup func()) { + t.Helper() + + dir, err := ioutil.TempDir("", "swarm-storage-") + if err != nil { + t.Fatal(err) + } + localStore, err := localstore.New(dir, make([]byte, 32), nil) + if err != nil { + os.RemoveAll(dir) + t.Fatal(err) + } + cleanup = func() { + localStore.Close() + os.RemoveAll(dir) + } + + fetcher = new(mockNetFetcher) + mockNetFetchFuncFactory := &mockNetFetchFuncFactory{ + fetcher: fetcher, + } + netStore, err = NewNetStore(localStore, mockNetFetchFuncFactory.newMockNetFetcher) + if err != nil { + cleanup() + t.Fatal(err) + } + return netStore, fetcher, cleanup +} + +// TestNetStoreGetAndPut tests calling NetStore.Get which is blocked until the same chunk is Put. +// After the Put there should no active fetchers, and the context created for the fetcher should +// be cancelled. +func TestNetStoreGetAndPut(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine with the Put does not run earlier than the Get + putErrC := make(chan error) + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { + putErrC <- errors.New("Expected netStore to use a fetcher for the Get call") + return + } + + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) + if err != nil { + putErrC <- fmt.Errorf("Expected no err got %v", err) + return + } + + putErrC <- nil + }() + + close(c) + recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) // this is blocked until the Put above is done + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + + if err := <-putErrC; err != nil { + t.Fatal(err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // the chunk is already available locally, so there should be no active fetchers waiting for it + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // A fetcher was created when the Get was called (and the chunk was not available). The chunk + // was delivered with the Put call, so the fetcher should be cancelled now. + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } + +} + +// TestNetStoreGetAndPut tests calling NetStore.Put and then NetStore.Get. +// After the Put the chunk is available locally, so the Get can just retrieve it from LocalStore, +// there is no need to create fetchers. +func TestNetStoreGetAfterPut(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // First we Put the chunk, so the chunk will be available locally + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + + // Get should retrieve the chunk from LocalStore, without creating fetcher + recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + // the retrieved chunk should be the same as what we Put + if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { + t.Fatalf("Different chunk received than what was put") + } + // no fetcher offer or request should be created for a locally available chunk + if fetcher.offerCalled || fetcher.requestCalled { + t.Fatal("NetFetcher.offerCalled or requestCalled not expected to be called") + } + // no fetchers should be created for a locally available chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } + +} + +// TestNetStoreGetTimeout tests a Get call for an unavailable chunk and waits for timeout +func TestNetStoreGetTimeout(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + c := make(chan struct{}) // this channel ensures that the gouroutine does not run earlier than the Get + fetcherErrC := make(chan error) + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { + fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") + return + } + + fetcherErrC <- nil + }() + + close(c) + // We call Get on this chunk, which is not in LocalStore. We don't Put it at all, so there will + // be a timeout + _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + + // Check if the timeout happened + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + if err := <-fetcherErrC; err != nil { + t.Fatal(err) + } + + // A fetcher was created, check if it has been removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // Check if the fetcher context has been cancelled after the timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreGetCancel tests a Get call for an unavailable chunk, then cancels the context and checks +// the errors +func TestNetStoreGetCancel(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + + c := make(chan struct{}) // this channel ensures that the gouroutine with the cancel does not run earlier than the Get + fetcherErrC := make(chan error, 1) + go func() { + <-c // wait for the Get to be called + time.Sleep(200 * time.Millisecond) // and a little more so it is surely called + // check if netStore created a fetcher in the Get call for the unavailable chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { + fetcherErrC <- errors.New("Expected netStore to use a fetcher for the Get call") + return + } + + fetcherErrC <- nil + cancel() + }() + + close(c) + + // We call Get with an unavailable chunk, so it will create a fetcher and wait for delivery + _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + + if err := <-fetcherErrC; err != nil { + t.Fatal(err) + } + + // After the context is cancelled above Get should return with an error + if err != context.Canceled { + t.Fatalf("Expected context.Canceled err got %v", err) + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after cancel") + } + + // Check if the fetcher context has been cancelled after the request context cancel + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreMultipleGetAndPut tests four Get calls for the same unavailable chunk. The chunk is +// delivered with a Put, we have to make sure all Get calls return, and they use a single fetcher +// for the chunk retrieval +func TestNetStoreMultipleGetAndPut(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + putErrC := make(chan error) + go func() { + // sleep to make sure Put is called after all the Get + time.Sleep(500 * time.Millisecond) + // check if netStore created exactly one fetcher for all Get calls + if netStore.fetchers.Len() != 1 { + putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls") + return + } + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) + if err != nil { + putErrC <- fmt.Errorf("Expected no err got %v", err) + return + } + putErrC <- nil + }() + + count := 4 + // call Get 4 times for the same unavailable chunk. The calls will be blocked until the Put above. + errC := make(chan error) + for i := 0; i < count; i++ { + go func() { + recChunk, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + if err != nil { + errC <- fmt.Errorf("Expected no err got %v", err) + } + if !bytes.Equal(recChunk.Address(), ch.Address()) || !bytes.Equal(recChunk.Data(), ch.Data()) { + errC <- errors.New("Different chunk received than what was put") + } + errC <- nil + }() + } + + if err := <-putErrC; err != nil { + t.Fatal(err) + } + + timeout := time.After(1 * time.Second) + + // The Get calls should return after Put, so no timeout expected + for i := 0; i < count; i++ { + select { + case err := <-errC: + if err != nil { + t.Fatal(err) + } + case <-timeout: + t.Fatalf("Timeout waiting for Get calls to return") + } + } + + // A fetcher was created, check if it has been removed after cancel + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // A fetcher was created, check if it has been removed after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } + +} + +// TestNetStoreFetchFuncTimeout tests a FetchFunc call for an unavailable chunk and waits for timeout +func TestNetStoreFetchFuncTimeout(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + chunk := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // FetchFunc is called for an unavaible chunk, so the returned wait function should not be nil + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should an active fetcher for the chunk after the FetchFunc call + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // wait function should timeout because we don't deliver the chunk with a Put + err := wait(ctx) + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadLineExceeded err got %v", err) + } + + // the fetcher should be removed after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after timeout") + } + + // the fetcher context should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetchFuncAfterPut tests that the FetchFunc should return nil for a locally available chunk +func TestNetStoreFetchFuncAfterPut(t *testing.T) { + netStore, _, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // We deliver the created the chunk with a Put + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + + // FetchFunc should return nil, because the chunk is available locally, no need to fetch it + wait := netStore.FetchFunc(ctx, ch.Address()) + if wait != nil { + t.Fatal("Expected wait to be nil") + } + + // No fetchers should be created at all + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to not have fetcher") + } +} + +// TestNetStoreGetCallsRequest tests if Get created a request on the NetFetcher for an unavailable chunk +func TestNetStoreGetCallsRequest(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx := context.WithValue(context.Background(), "hopcount", uint8(5)) + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expected context.DeadlineExceeded err got %v", err) + } + + // NetStore should call NetFetcher.Request and wait for the chunk + if !fetcher.requestCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } + + if fetcher.hopCounts[0] != 5 { + t.Fatalf("Expected NetFetcher.Request be called with hopCount 5, got %v", fetcher.hopCounts[0]) + } +} + +// TestNetStoreGetCallsOffer tests if Get created a request on the NetFetcher for an unavailable chunk +// in case of a source peer provided in the context. +func TestNetStoreGetCallsOffer(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + // If a source peer is added to the context, NetStore will handle it as an offer + ctx := context.WithValue(context.Background(), "source", sourcePeerID.String()) + ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) + defer cancel() + + // We call get for a not available chunk, it will timeout because the chunk is not delivered + _, err := netStore.Get(ctx, chunk.ModeGetRequest, ch.Address()) + + if err != context.DeadlineExceeded { + t.Fatalf("Expect error %v got %v", context.DeadlineExceeded, err) + } + + // NetStore should call NetFetcher.Offer with the source peer + if !fetcher.offerCalled { + t.Fatal("Expected NetFetcher.Request to be called") + } + + if len(fetcher.sources) != 1 { + t.Fatalf("Expected fetcher sources length 1 got %v", len(fetcher.sources)) + } + + if fetcher.sources[0].String() != sourcePeerID.String() { + t.Fatalf("Expected fetcher source %v got %v", sourcePeerID, fetcher.sources[0]) + } + +} + +// TestNetStoreFetcherCountPeers tests multiple NetStore.Get calls with peer in the context. +// There is no Put call, so the Get calls timeout +func TestNetStoreFetcherCountPeers(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + addr := randomAddr() + peers := []string{randomAddr().Hex(), randomAddr().Hex(), randomAddr().Hex()} + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + errC := make(chan error) + nrGets := 3 + + // Call Get 3 times with a peer in context + for i := 0; i < nrGets; i++ { + peer := peers[i] + go func() { + ctx := context.WithValue(ctx, "peer", peer) + _, err := netStore.Get(ctx, chunk.ModeGetRequest, addr) + errC <- err + }() + } + + // All 3 Get calls should timeout + for i := 0; i < nrGets; i++ { + err := <-errC + if err != context.DeadlineExceeded { + t.Fatalf("Expected \"%v\" error got \"%v\"", context.DeadlineExceeded, err) + } + } + + // fetcher should be closed after timeout + select { + case <-fetcher.quit: + case <-time.After(3 * time.Second): + t.Fatalf("mockNetFetcher not closed after timeout") + } + + // All 3 peers should be given to NetFetcher after the 3 Get calls + if len(fetcher.peersPerRequest) != nrGets { + t.Fatalf("Expected 3 got %v", len(fetcher.peersPerRequest)) + } + + for i, peers := range fetcher.peersPerRequest { + if len(peers) < i+1 { + t.Fatalf("Expected at least %v got %v", i+1, len(peers)) + } + } +} + +// TestNetStoreFetchFuncCalledMultipleTimes calls the wait function given by FetchFunc three times, +// and checks there is still exactly one fetcher for one chunk. Afthe chunk is delivered, it checks +// if the fetcher is closed. +func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + ch := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, ch.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // Call wait three times in parallel + count := 3 + errC := make(chan error) + for i := 0; i < count; i++ { + go func() { + errC <- wait(ctx) + }() + } + + // sleep a little so the wait functions are called above + time.Sleep(100 * time.Millisecond) + + // there should be still only one fetcher, because all wait calls are for the same chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(ch.Address()) == nil { + t.Fatal("Expected netStore to have one fetcher for the requested chunk") + } + + // Deliver the chunk with a Put + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) + if err != nil { + t.Fatalf("Expected no err got %v", err) + } + + // wait until all wait calls return (because the chunk is delivered) + for i := 0; i < count; i++ { + err := <-errC + if err != nil { + t.Fatal(err) + } + } + + // There should be no more fetchers for the delivered chunk + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // The context for the fetcher should be cancelled after delivery + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +// TestNetStoreFetcherLifeCycleWithTimeout is similar to TestNetStoreFetchFuncCalledMultipleTimes, +// the only difference is that we don't deilver the chunk, just wait for timeout +func TestNetStoreFetcherLifeCycleWithTimeout(t *testing.T) { + netStore, fetcher, cleanup := newTestNetStore(t) + defer cleanup() + + chunk := GenerateRandomChunk(chunk.DefaultSize) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + // FetchFunc should return a non-nil wait function, because the chunk is not available + wait := netStore.FetchFunc(ctx, chunk.Address()) + if wait == nil { + t.Fatal("Expected wait function to be not nil") + } + + // There should be exactly one fetcher for the chunk + if netStore.fetchers.Len() != 1 || netStore.getFetcher(chunk.Address()) == nil { + t.Fatalf("Expected netStore to have one fetcher for the requested chunk") + } + + // Call wait three times in parallel + count := 3 + errC := make(chan error) + for i := 0; i < count; i++ { + go func() { + rctx, rcancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer rcancel() + err := wait(rctx) + if err != context.DeadlineExceeded { + errC <- fmt.Errorf("Expected err %v got %v", context.DeadlineExceeded, err) + return + } + errC <- nil + }() + } + + // wait until all wait calls timeout + for i := 0; i < count; i++ { + err := <-errC + if err != nil { + t.Fatal(err) + } + } + + // There should be no more fetchers after timeout + if netStore.fetchers.Len() != 0 { + t.Fatal("Expected netStore to remove the fetcher after delivery") + } + + // The context for the fetcher should be cancelled after timeout + select { + case <-fetcher.ctx.Done(): + default: + t.Fatal("Expected fetcher context to be cancelled") + } +} + +func randomAddr() Address { + addr := make([]byte, 32) + rand.Read(addr) + return Address(addr) +} diff --git a/storage/request.go b/storage/request.go deleted file mode 100644 index 47bb66cb19..0000000000 --- a/storage/request.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package storage - -import ( - "sync" - "time" - - "github.com/ethersphere/swarm/network/timeouts" - - "github.com/ethereum/go-ethereum/p2p/enode" -) - -// Request encapsulates all the necessary arguments when making a request to NetStore. -// These could have also been added as part of the interface of NetStore.Get, but a request struct seemed -// like a better option -type Request struct { - Addr Address // chunk address - Origin enode.ID // who is sending us that request? we compare Origin to the suggested peer from RequestFromPeers - PeersToSkip sync.Map // peers not to request chunk from -} - -// NewRequest returns a new instance of Request based on chunk address skip check and -// a map of peers to skip. -func NewRequest(addr Address) *Request { - return &Request{ - Addr: addr, - } -} - -// SkipPeer returns if the peer with nodeID should not be requested to deliver a chunk. -// Peers to skip are kept per Request and for a time period of FailedPeerSkipDelay. -func (r *Request) SkipPeer(nodeID string) bool { - val, ok := r.PeersToSkip.Load(nodeID) - if !ok { - return false - } - t, ok := val.(time.Time) - if ok && time.Now().After(t.Add(timeouts.FailedPeerSkipDelay)) { - r.PeersToSkip.Delete(nodeID) - return false - } - return true -} diff --git a/swarm.go b/swarm.go index d6f40beed3..1bdaa63bfe 100644 --- a/swarm.go +++ b/swarm.go @@ -64,6 +64,7 @@ var ( startCounter = metrics.NewRegisteredCounter("stack,start", nil) stopCounter = metrics.NewRegisteredCounter("stack,stop", nil) uptimeGauge = metrics.NewRegisteredGauge("stack.uptime", nil) + requestsCacheGauge = metrics.NewRegisteredGauge("storage.cache.requests.size", nil) ) // the swarm stack @@ -173,15 +174,17 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e feedsHandler, ) - nodeID := config.Enode.ID() - self.netStore = storage.NewNetStore(lstore, nodeID) + self.netStore, err = storage.NewNetStore(lstore, nil) + if err != nil { + return nil, err + } to := network.NewKademlia( common.FromHex(config.BzzKey), network.NewKadParams(), ) delivery := stream.NewDelivery(to, self.netStore) - self.netStore.RemoteGet = delivery.RequestFromPeers + self.netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, config.DeliverySkipCheck).New feedsHandler.SetStore(self.netStore) @@ -194,6 +197,8 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e self.accountingMetrics = protocols.SetupAccountingMetrics(10*time.Second, filepath.Join(config.Path, "metrics.db")) } + nodeID := config.Enode.ID() + syncing := stream.SyncingAutoSubscribe if !config.SyncEnabled || config.LightNodeEnabled { syncing = stream.SyncingDisabled @@ -209,8 +214,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e tags := chunk.NewTags() //todo load from state store // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - lnetStore := storage.NewLNetStore(self.netStore) - self.fileStore = storage.NewFileStore(lnetStore, self.config.FileStoreParams, tags) + self.fileStore = storage.NewFileStore(self.netStore, self.config.FileStoreParams, tags) log.Debug("Setup local storage") @@ -407,6 +411,7 @@ func (s *Swarm) Start(srv *p2p.Server) error { select { case <-time.After(updateGaugesPeriod): uptimeGauge.Update(time.Since(startTime).Nanoseconds()) + requestsCacheGauge.Update(int64(s.netStore.RequestsCacheLen())) case <-doneC: return } diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go deleted file mode 100644 index 97a1aa4bb3..0000000000 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight // import "golang.org/x/sync/singleflight" - -import "sync" - -// call is an in-flight or completed singleflight.Do call -type call struct { - wg sync.WaitGroup - - // These fields are written once before the WaitGroup is done - // and are only read after the WaitGroup is done. - val interface{} - err error - - // forgotten indicates whether Forget was called with this call's key - // while the call was still in flight. - forgotten bool - - // These fields are read and written with the singleflight - // mutex held before the WaitGroup is done, and are read but - // not written after the WaitGroup is done. - dups int - chans []chan<- Result -} - -// Group represents a class of work and forms a namespace in -// which units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Result holds the results of Do, so they can be passed -// on a channel. -type Result struct { - Val interface{} - Err error - Shared bool -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -// The return value shared indicates whether v was given to multiple callers. -func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - g.mu.Unlock() - c.wg.Wait() - return c.val, c.err, true - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - g.doCall(c, key, fn) - return c.val, c.err, c.dups > 0 -} - -// DoChan is like Do but returns a channel that will receive the -// results when they are ready. -func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { - ch := make(chan Result, 1) - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - c.chans = append(c.chans, ch) - g.mu.Unlock() - return ch - } - c := &call{chans: []chan<- Result{ch}} - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - go g.doCall(c, key, fn) - - return ch -} - -// doCall handles the single call for a key. -func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) - } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - g.mu.Unlock() -} - -// Forget tells the singleflight to forget about a key. Future calls -// to Do for this key will call the function rather than waiting for -// an earlier call to complete. -func (g *Group) Forget(key string) { - g.mu.Lock() - if c, ok := g.m[key]; ok { - c.forgotten = true - } - delete(g.m, key) - g.mu.Unlock() -} diff --git a/vendor/vendor.json b/vendor/vendor.json index 2dcfbba3fd..06dd683cdd 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1240,12 +1240,6 @@ "revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146", "revisionTime": "2019-04-03T01:06:53Z" }, - { - "checksumSHA1": "FuQoDr6zh5GsiVVyo3oDZcUVC3c=", - "path": "golang.org/x/sync/singleflight", - "revision": "112230192c580c3556b8cee6403af37a4fc5f28c", - "revisionTime": "2019-04-22T22:11:18Z" - }, { "checksumSHA1": "4TEYFKrAUuwBMqExjQBsnf/CgjQ=", "path": "golang.org/x/sync/syncmap",