simple fetchers (#1492)

* network: disable shouldNOTRequestAgain
* vendor singleflight
* network: disable shouldNOTRequestAgain
* network, storage: fix tests
* network/storage: remove HopCount and SkipCheck
* network/stream: use localstore when providing chunks a node has offered
* storage: refactor lnetstore
* storage: rename FetcherItem to Fetcher
* storage/feed: no distinction between catastrophic err or chunk not found
* network/stream: remove TestDeliveryFromNodes, as FindPeer is changed, and Swarm connectivity is not a chain
* network/stream: fixed intervals tests
* swarm: fixes for linter
* storage: use LRU cache for fetchers
* network, storage: better godoc
* storage/netstore: explicit errors
* storage/feed/lookup: Clarify ReadFunc expected return error values
* storage: address comments by elad
This commit is contained in:
Anton Evangelatov
2019-06-18 08:47:27 +02:00
committed by acud
parent f57d4f0802
commit d589af14a8
23 changed files with 746 additions and 2246 deletions

View File

@@ -56,7 +56,6 @@ var (
bucketKeyStore = simulation.BucketKey("store")
bucketKeyFileStore = simulation.BucketKey("filestore")
bucketKeyNetStore = simulation.BucketKey("netstore")
bucketKeyDelivery = simulation.BucketKey("delivery")
bucketKeyRegistry = simulation.BucketKey("registry")
@@ -81,7 +80,7 @@ func newNetStoreAndDelivery(ctx *adapters.ServiceContext, bucket *sync.Map) (*ne
return nil, nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
netStore.RemoteGet = delivery.RequestFromPeers
return addr, netStore, delivery, cleanup, nil
}
@@ -93,13 +92,13 @@ func newNetStoreAndDeliveryWithBzzAddr(ctx *adapters.ServiceContext, bucket *syn
return nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
netStore.RemoteGet = delivery.RequestFromPeers
return netStore, delivery, cleanup, nil
}
// newNetStoreAndDeliveryWithRequestFunc is a constructor for NetStore and Delivery, used in Simulations, accepting any NetStore.RequestFunc
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf network.RequestFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket *sync.Map, rf storage.RemoteGetFunc) (*network.BzzAddr, *storage.NetStore, *Delivery, func(), error) {
addr := network.NewAddr(ctx.Config.Node())
netStore, delivery, cleanup, err := netStoreAndDeliveryWithAddr(ctx, bucket, addr)
@@ -107,7 +106,7 @@ func newNetStoreAndDeliveryWithRequestFunc(ctx *adapters.ServiceContext, bucket
return nil, nil, nil, nil, err
}
netStore.NewNetFetcherFunc = network.NewFetcherFactory(rf, true).New
netStore.RemoteGet = rf
return addr, netStore, delivery, cleanup, nil
}
@@ -120,14 +119,9 @@ func netStoreAndDeliveryWithAddr(ctx *adapters.ServiceContext, bucket *sync.Map,
return nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
localStore.Close()
localStoreCleanup()
return nil, nil, nil, err
}
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams(), chunk.NewTags())
netStore := storage.NewNetStore(localStore, enode.ID{})
lnetStore := storage.NewLNetStore(netStore)
fileStore := storage.NewFileStore(lnetStore, storage.NewFileStoreParams(), chunk.NewTags())
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
delivery := NewDelivery(kad, netStore)
@@ -167,15 +161,11 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
return nil, nil, nil, nil, err
}
netStore, err := storage.NewNetStore(localStore, nil)
if err != nil {
localStore.Close()
removeDataDir()
return nil, nil, nil, nil, err
}
netStore := storage.NewNetStore(localStore, enode.ID{})
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
netStore.RemoteGet = delivery.RequestFromPeers
intervalsStore := state.NewInmemoryStore()
streamer := NewRegistry(addr.ID(), delivery, netStore, intervalsStore, registryOptions, nil)

View File

@@ -27,9 +27,9 @@ import (
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
"github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/spancontext"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/tracing"
opentracing "github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)
@@ -39,9 +39,6 @@ var (
handleRetrieveRequestMsgCount = metrics.NewRegisteredCounter("network.stream.handle_retrieve_request_msg.count", nil)
retrieveChunkFail = metrics.NewRegisteredCounter("network.stream.retrieve_chunks_fail.count", nil)
requestFromPeersCount = metrics.NewRegisteredCounter("network.stream.request_from_peers.count", nil)
requestFromPeersEachCount = metrics.NewRegisteredCounter("network.stream.request_from_peers_each.count", nil)
lastReceivedChunksMsg = metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)
)
@@ -62,52 +59,42 @@ func NewDelivery(kad *network.Kademlia, netStore *storage.NetStore) *Delivery {
// RetrieveRequestMsg is the protocol msg for chunk retrieve requests
type RetrieveRequestMsg struct {
Addr storage.Address
SkipCheck bool
HopCount uint8
Addr storage.Address
}
func (d *Delivery) handleRetrieveRequestMsg(ctx context.Context, sp *Peer, req *RetrieveRequestMsg) error {
log.Trace("received request", "peer", sp.ID(), "hash", req.Addr)
log.Trace("handle retrieve request", "peer", sp.ID(), "hash", req.Addr)
handleRetrieveRequestMsgCount.Inc(1)
var osp opentracing.Span
ctx, osp = spancontext.StartSpan(
ctx, osp := spancontext.StartSpan(
ctx,
"stream.handle.retrieve")
"handle.retrieve.request")
osp.LogFields(olog.String("ref", req.Addr.String()))
var cancel func()
// TODO: do something with this hardcoded timeout, maybe use TTL in the future
ctx = context.WithValue(ctx, "peer", sp.ID().String())
ctx = context.WithValue(ctx, "hopcount", req.HopCount)
ctx, cancel = context.WithTimeout(ctx, network.RequestTimeout)
defer osp.Finish()
go func() {
select {
case <-ctx.Done():
case <-d.quit:
}
cancel()
}()
ctx, cancel := context.WithTimeout(ctx, timeouts.FetcherGlobalTimeout)
defer cancel()
go func() {
defer osp.Finish()
ch, err := d.netStore.Get(ctx, chunk.ModeGetRequest, req.Addr)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "hopcount", req.HopCount, "err", err)
return
}
syncing := false
r := &storage.Request{
Addr: req.Addr,
Origin: sp.ID(),
}
chunk, err := d.netStore.Get(ctx, chunk.ModeGetRequest, r)
if err != nil {
retrieveChunkFail.Inc(1)
log.Debug("ChunkStore.Get can not retrieve chunk", "peer", sp.ID().String(), "addr", req.Addr, "err", err)
return nil
}
err = sp.Deliver(ctx, ch, Top, syncing)
if err != nil {
log.Warn("ERROR in handleRetrieveRequestMsg", "err", err)
}
osp.LogFields(olog.Bool("delivered", true))
}()
log.Trace("retrieve request, delivery", "ref", req.Addr, "peer", sp.ID())
syncing := false
err = sp.Deliver(ctx, chunk, 0, syncing)
if err != nil {
log.Error("sp.Deliver errored", "err", err)
}
osp.LogFields(olog.Bool("delivered", true))
return nil
}
@@ -189,57 +176,166 @@ func (d *Delivery) Close() {
close(d.quit)
}
// RequestFromPeers sends a chunk retrieve request to a peer
// The most eligible peer that hasn't already been sent to is chosen
// TODO: define "eligible"
func (d *Delivery) RequestFromPeers(ctx context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
requestFromPeersCount.Inc(1)
var sp *Peer
spID := req.Source
// getOriginPo returns the originPo if the incoming Request has an Origin
// if our node is the first node that requests this chunk, then we don't have an Origin,
// and return -1
// this is used only for tracing, and can probably be refactor so that we don't have to
// iterater over Kademlia
func (d *Delivery) getOriginPo(req *storage.Request) int {
originPo := -1
if spID != nil {
sp = d.getPeer(*spID)
if sp == nil {
return nil, nil, fmt.Errorf("source peer %v not found", spID.String())
}
} else {
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
id := p.ID()
if p.LightNode {
// skip light nodes
return true
}
if req.SkipPeer(id.String()) {
log.Trace("Delivery.RequestFromPeers: skip peer", "peer id", id)
return true
}
sp = d.getPeer(id)
// sp is nil, when we encounter a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
if sp == nil {
return true
}
spID = &id
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
id := p.ID()
// get po between chunk and origin
if req.Origin.String() == id.String() {
originPo = po
return false
})
if sp == nil {
return nil, nil, errors.New("no peer found")
}
return true
})
return originPo
}
// FindPeer is returning the closest peer from Kademlia that a chunk
// request hasn't already been sent to
func (d *Delivery) FindPeer(ctx context.Context, req *storage.Request) (*Peer, error) {
var sp *Peer
var err error
osp, _ := ctx.Value("remote.fetch").(opentracing.Span)
// originPo - proximity of the node that made the request; -1 if the request originator is our node;
// myPo - this node's proximity with the requested chunk
// selectedPeerPo - kademlia suggested node's proximity with the requested chunk (computed further below)
originPo := d.getOriginPo(req)
myPo := chunk.Proximity(req.Addr, d.kad.BaseAddr())
selectedPeerPo := -1
depth := d.kad.NeighbourhoodDepth()
if osp != nil {
osp.LogFields(olog.Int("originPo", originPo))
osp.LogFields(olog.Int("depth", depth))
osp.LogFields(olog.Int("myPo", myPo))
}
// do not forward requests if origin proximity is bigger than our node's proximity
// this means that origin is closer to the chunk
if originPo > myPo {
return nil, errors.New("not forwarding request, origin node is closer to chunk than this node")
}
d.kad.EachConn(req.Addr[:], 255, func(p *network.Peer, po int) bool {
id := p.ID()
// skip light nodes
if p.LightNode {
return true
}
// do not send request back to peer who asked us. maybe merge with SkipPeer at some point
if req.Origin.String() == id.String() {
return true
}
// skip peers that we have already tried
if req.SkipPeer(id.String()) {
log.Trace("findpeer skip peer", "peer", id, "ref", req.Addr.String())
return true
}
if myPo < depth { // chunk is NOT within the neighbourhood
if po <= myPo { // always choose a peer strictly closer to chunk than us
log.Trace("findpeer1a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
return false
} else {
log.Trace("findpeer1b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
}
} else { // chunk IS WITHIN neighbourhood
if po < depth { // do not select peer outside the neighbourhood. But allows peers further from the chunk than us
log.Trace("findpeer2a", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
return false
} else if po <= originPo { // avoid loop in neighbourhood, so not forward when a request comes from the neighbourhood
log.Trace("findpeer2b", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
return false
} else {
log.Trace("findpeer2c", "originpo", originPo, "mypo", myPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
}
}
// if selected peer is not in the depth (2nd condition; if depth <= po, then peer is in nearest neighbourhood)
// and they have a lower po than ours, return error
if po < myPo && depth > po {
log.Trace("findpeer4 skip peer because origin was closer", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
err = fmt.Errorf("not asking peers further away from origin; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
return false
}
// if chunk falls in our nearest neighbourhood (1st condition), but suggested peer is not in
// the nearest neighbourhood (2nd condition), don't forward the request to suggested peer
if depth <= myPo && depth > po {
log.Trace("findpeer5 skip peer because depth", "originpo", originPo, "po", po, "depth", depth, "peer", id, "ref", req.Addr.String())
err = fmt.Errorf("not going outside of depth; ref=%s originpo=%v po=%v depth=%v myPo=%v", req.Addr.String(), originPo, po, depth, myPo)
return false
}
sp = d.getPeer(id)
// sp could be nil, if we encountered a peer that is not registered for delivery, i.e. doesn't support the `stream` protocol
// if sp is not nil, then we have selected the next peer and we stop iterating
// if sp is nil, we continue iterating
if sp != nil {
selectedPeerPo = po
return false
}
// continue iterating
return true
})
if osp != nil {
osp.LogFields(olog.Int("selectedPeerPo", selectedPeerPo))
}
if err != nil {
return nil, err
}
if sp == nil {
return nil, errors.New("no peer found")
}
return sp, nil
}
// RequestFromPeers sends a chunk retrieve request to the next found peer
func (d *Delivery) RequestFromPeers(ctx context.Context, req *storage.Request, localID enode.ID) (*enode.ID, error) {
metrics.GetOrRegisterCounter("delivery.requestfrompeers", nil).Inc(1)
sp, err := d.FindPeer(ctx, req)
if err != nil {
log.Trace(err.Error())
return nil, err
}
// setting this value in the context creates a new span that can persist across the sendpriority queue and the network roundtrip
// this span will finish only when delivery is handled (or times out)
ctx = context.WithValue(ctx, tracing.StoreLabelId, "stream.send.request")
ctx = context.WithValue(ctx, tracing.StoreLabelMeta, fmt.Sprintf("%v.%v", sp.ID(), req.Addr))
log.Trace("request.from.peers", "peer", sp.ID(), "ref", req.Addr)
err := sp.SendPriority(ctx, &RetrieveRequestMsg{
Addr: req.Addr,
SkipCheck: req.SkipCheck,
HopCount: req.HopCount,
}, Top)
if err != nil {
return nil, nil, err
r := &RetrieveRequestMsg{
Addr: req.Addr,
}
log.Trace("sending retrieve request", "ref", r.Addr, "peer", sp.ID().String(), "origin", localID)
err = sp.Send(ctx, r)
if err != nil {
log.Error(err.Error())
return nil, err
}
requestFromPeersEachCount.Inc(1)
return spID, sp.quit, nil
spID := sp.ID()
return &spID, nil
}

View File

@@ -19,26 +19,17 @@ package stream
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
pq "github.com/ethersphere/swarm/network/priorityqueue"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
)
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
@@ -160,18 +151,12 @@ func TestRequestFromPeers(t *testing.T) {
streamer: r,
}
r.setPeer(sp)
req := network.NewRequest(
storage.Address(hash0[:]),
true,
&sync.Map{},
)
ctx := context.Background()
id, _, err := delivery.RequestFromPeers(ctx, req)
req := storage.NewRequest(storage.Address(hash0[:]))
id, err := delivery.FindPeer(context.TODO(), req)
if err != nil {
t.Fatal(err)
}
if *id != dummyPeerID {
if id.ID() != dummyPeerID {
t.Fatalf("Expected an id, got %v", id)
}
}
@@ -201,15 +186,10 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
}
r.setPeer(sp)
req := network.NewRequest(
storage.Address(hash0[:]),
true,
&sync.Map{},
)
req := storage.NewRequest(storage.Address(hash0[:]))
ctx := context.Background()
// making a request which should return with "no peer found"
_, _, err := delivery.RequestFromPeers(ctx, req)
_, err := delivery.FindPeer(context.TODO(), req)
expectedError := "no peer found"
if err.Error() != expectedError {
@@ -300,293 +280,3 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
}
}
func TestDeliveryFromNodes(t *testing.T) {
testDeliveryFromNodes(t, 2, dataChunkCount, true)
testDeliveryFromNodes(t, 2, dataChunkCount, false)
testDeliveryFromNodes(t, 4, dataChunkCount, true)
testDeliveryFromNodes(t, 4, dataChunkCount, false)
if testutil.RaceEnabled {
// Travis cannot handle more nodes with -race; would time out.
return
}
testDeliveryFromNodes(t, 8, dataChunkCount, true)
testDeliveryFromNodes(t, 8, dataChunkCount, false)
testDeliveryFromNodes(t, 16, dataChunkCount, true)
testDeliveryFromNodes(t, 16, dataChunkCount, false)
}
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)
cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]
//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
}
//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
})
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
for chunks := 32; chunks <= 128; chunks *= 2 {
for i := 2; i < 32; i *= 2 {
b.Run(
fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
func(b *testing.B) {
benchmarkDeliveryFromNodes(b, i, chunks, true)
},
)
}
}
}
func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
for chunks := 32; chunks <= 128; chunks *= 2 {
for i := 2; i < 32; i *= 2 {
b.Run(
fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
func(b *testing.B) {
benchmarkDeliveryFromNodes(b, i, chunks, false)
},
)
}
}
}
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)
cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Initializing test config")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
b.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]
item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
return errors.New("No filestore")
}
remoteFileStore := item.(*storage.FileStore)
pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
return errors.New("No filestore")
}
netStore := item.(*storage.NetStore)
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
// benchmark loop
b.ResetTimer()
b.StopTimer()
Loop:
for i := 0; i < b.N; i++ {
// uploading chunkCount random chunks to the last node
hashes := make([]storage.Address, chunkCount)
for i := 0; i < chunkCount; i++ {
// create actual size real chunks
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
if err != nil {
return fmt.Errorf("store: %v", err)
}
// wait until all chunks stored
err = wait(ctx)
if err != nil {
return fmt.Errorf("wait store: %v", err)
}
// collect the hashes
hashes[i] = hash
}
// now benchmark the actual retrieval
// netstore.Get is called for each hash in a go routine and errors are collected
b.StartTimer()
errs := make(chan error)
for _, hash := range hashes {
go func(h storage.Address) {
_, err := netStore.Get(ctx, chunk.ModeGetRequest, h)
log.Warn("test check netstore get", "hash", h, "err", err)
errs <- err
}(hash)
}
// count and report retrieval errors
// if there are misses then chunk timeout is too low for the distance and volume (?)
var total, misses int
for err := range errs {
if err != nil {
log.Warn(err.Error())
misses++
}
total++
if total == chunkCount {
break
}
}
b.StopTimer()
if misses > 0 {
err = fmt.Errorf("%v chunk not found out of %v", misses, total)
break Loop
}
}
return err
})
if result.Error != nil {
b.Fatal(result.Error)
}
}

View File

@@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
@@ -75,7 +76,7 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
return newTestExternalClient(netStore), nil
})
r.RegisterServerFunc(externalStreamName, func(p *Peer, t string, live bool) (Server, error) {
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys, nil), nil
return newTestExternalServer(t, externalStreamSessionAt, externalStreamMaxKeys), nil
})
cleanup := func() {
@@ -298,38 +299,42 @@ func newTestExternalClient(netStore *storage.NetStore) *testExternalClient {
}
}
func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
wait := c.netStore.FetchFunc(ctx, storage.Address(hash))
if wait == nil {
return nil
func (c *testExternalClient) NeedData(ctx context.Context, key []byte) (bool, func(context.Context) error) {
fi, loaded, ok := c.netStore.GetOrCreateFetcher(ctx, key, "syncer")
if !ok {
return loaded, nil
}
select {
case c.hashes <- hash:
case c.hashes <- key:
case <-ctx.Done():
log.Warn("testExternalClient NeedData context", "err", ctx.Err())
return func(_ context.Context) error {
return false, func(_ context.Context) error {
return ctx.Err()
}
}
return wait
return loaded, func(ctx context.Context) error {
select {
case <-fi.Delivered:
case <-time.After(timeouts.SyncerClientWaitTimeout):
return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key))
}
return nil
}
}
func (c *testExternalClient) Close() {}
type testExternalServer struct {
t string
keyFunc func(key []byte, index uint64)
sessionAt uint64
maxKeys uint64
}
func newTestExternalServer(t string, sessionAt, maxKeys uint64, keyFunc func(key []byte, index uint64)) *testExternalServer {
if keyFunc == nil {
keyFunc = binary.BigEndian.PutUint64
}
func newTestExternalServer(t string, sessionAt, maxKeys uint64) *testExternalServer {
return &testExternalServer{
t: t,
keyFunc: keyFunc,
sessionAt: sessionAt,
maxKeys: maxKeys,
}
@@ -345,7 +350,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
binary.BigEndian.PutUint64(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil
}

View File

@@ -225,9 +225,14 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
for i := 0; i < lenHashes; i += HashSize {
hash := hashes[i : i+HashSize]
if wait := c.NeedData(ctx, hash); wait != nil {
log.Trace("checking offered hash", "ref", fmt.Sprintf("%x", hash))
if _, wait := c.NeedData(ctx, hash); wait != nil {
ctr++
// set the bit, so create a request
want.Set(i/HashSize, true)
log.Trace("need data", "ref", fmt.Sprintf("%x", hash), "request", true)
// measure how long it takes before we mark chunks for retrieval, and actually send the request
if !wantDelaySet {

View File

@@ -63,8 +63,8 @@ const (
// Tests in this file should not request chunks from peers.
// This function will panic indicating that there is a problem if request has been made.
func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
panic(fmt.Sprintf("unexpected request: address %s, source %s", req.Addr.String(), req.Source.String()))
func dummyRequestFromPeers(_ context.Context, req *storage.Request, _ enode.ID) (*enode.ID, error) {
panic(fmt.Sprintf("unexpected request: address %s", req.Addr.String()))
}
//This test is a syncing test for nodes.

View File

@@ -543,7 +543,7 @@ func (c *client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
NeedData(context.Context, []byte) func(context.Context) error
NeedData(context.Context, []byte) (bool, func(context.Context) error)
Close()
}

View File

@@ -93,20 +93,20 @@ func newTestClient(t string) *testClient {
}
}
func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error {
func (self *testClient) NeedData(ctx context.Context, hash []byte) (bool, func(context.Context) error) {
self.receivedHashes[string(hash)] = hash
if bytes.Equal(hash, hash0[:]) {
return func(context.Context) error {
return false, func(context.Context) error {
<-self.wait0
return nil
}
} else if bytes.Equal(hash, hash2[:]) {
return func(context.Context) error {
return false, func(context.Context) error {
<-self.wait2
return nil
}
}
return nil
return false, nil
}
func (self *testClient) Close() {}

View File

@@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network/timeouts"
"github.com/ethersphere/swarm/storage"
)
@@ -73,7 +74,7 @@ func (s *SwarmSyncerServer) Close() {
// GetData retrieves the actual chunk from netstore
func (s *SwarmSyncerServer) GetData(ctx context.Context, key []byte) ([]byte, error) {
ch, err := s.netStore.Get(ctx, chunk.ModeGetSync, storage.Address(key))
ch, err := s.netStore.Store.Get(ctx, chunk.ModeGetSync, storage.Address(key))
if err != nil {
return nil, err
}
@@ -198,9 +199,24 @@ func RegisterSwarmSyncerClient(streamer *Registry, netStore *storage.NetStore) {
})
}
// NeedData
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func(context.Context) error) {
return s.netStore.FetchFunc(ctx, key)
func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (loaded bool, wait func(context.Context) error) {
start := time.Now()
fi, loaded, ok := s.netStore.GetOrCreateFetcher(ctx, key, "syncer")
if !ok {
return loaded, nil
}
return loaded, func(ctx context.Context) error {
select {
case <-fi.Delivered:
metrics.GetOrRegisterResettingTimer(fmt.Sprintf("fetcher.%s.syncer", fi.CreatedBy), nil).UpdateSince(start)
case <-time.After(timeouts.SyncerClientWaitTimeout):
metrics.GetOrRegisterCounter("fetcher.syncer.timeout", nil).Inc(1)
return fmt.Errorf("chunk not delivered through syncing after %dsec. ref=%s", timeouts.SyncerClientWaitTimeout, fmt.Sprintf("%x", key))
}
return nil
}
}
func (s *SwarmSyncerClient) Close() {}