Revert "simplification of Fetchers (#1344)" (#1491)

This reverts commit 0b724bd4d5.
This commit is contained in:
Anton Evangelatov
2019-06-17 10:30:55 +02:00
committed by GitHub
parent 0b724bd4d5
commit 604960938b
23 changed files with 2242 additions and 742 deletions

View File

@@ -19,17 +19,26 @@ package stream
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
pq "github.com/ethersphere/swarm/network/priorityqueue"
"github.com/ethersphere/swarm/network/simulation"
"github.com/ethersphere/swarm/p2p/protocols"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil"
)
//Test requesting a chunk from a peer then issuing a "empty" OfferedHashesMsg (no hashes available yet)
@@ -151,12 +160,18 @@ func TestRequestFromPeers(t *testing.T) {
streamer: r,
}
r.setPeer(sp)
req := storage.NewRequest(storage.Address(hash0[:]))
id, err := delivery.FindPeer(context.TODO(), req)
req := network.NewRequest(
storage.Address(hash0[:]),
true,
&sync.Map{},
)
ctx := context.Background()
id, _, err := delivery.RequestFromPeers(ctx, req)
if err != nil {
t.Fatal(err)
}
if id.ID() != dummyPeerID {
if *id != dummyPeerID {
t.Fatalf("Expected an id, got %v", id)
}
}
@@ -186,10 +201,15 @@ func TestRequestFromPeersWithLightNode(t *testing.T) {
}
r.setPeer(sp)
req := storage.NewRequest(storage.Address(hash0[:]))
req := network.NewRequest(
storage.Address(hash0[:]),
true,
&sync.Map{},
)
ctx := context.Background()
// making a request which should return with "no peer found"
_, err := delivery.FindPeer(context.TODO(), req)
_, _, err := delivery.RequestFromPeers(ctx, req)
expectedError := "no peer found"
if err.Error() != expectedError {
@@ -280,3 +300,293 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
}
}
func TestDeliveryFromNodes(t *testing.T) {
testDeliveryFromNodes(t, 2, dataChunkCount, true)
testDeliveryFromNodes(t, 2, dataChunkCount, false)
testDeliveryFromNodes(t, 4, dataChunkCount, true)
testDeliveryFromNodes(t, 4, dataChunkCount, false)
if testutil.RaceEnabled {
// Travis cannot handle more nodes with -race; would time out.
return
}
testDeliveryFromNodes(t, 8, dataChunkCount, true)
testDeliveryFromNodes(t, 8, dataChunkCount, false)
testDeliveryFromNodes(t, 16, dataChunkCount, true)
testDeliveryFromNodes(t, 16, dataChunkCount, false)
}
func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool) {
t.Helper()
t.Run(fmt.Sprintf("testDeliveryFromNodes_%d_%d_skipCheck_%t", nodes, chunkCount, skipCheck), func(t *testing.T) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
}, nil)
bucket.Store(bucketKeyRegistry, r)
cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Adding nodes to simulation")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
t.Fatal(err)
}
log.Info("Starting simulation")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
//determine the pivot node to be the first node of the simulation
pivot := nodeIDs[0]
//distribute chunks of a random file into Stores of nodes 1 to nodes
//we will do this by creating a file store with an underlying round-robin store:
//the file store will create a hash for the uploaded file, but every chunk will be
//distributed to different nodes via round-robin scheduling
log.Debug("Writing file to round-robin file store")
//to do this, we create an array for chunkstores (length minus one, the pivot node)
stores := make([]storage.ChunkStore, len(nodeIDs)-1)
//we then need to get all stores from the sim....
lStores := sim.NodesItems(bucketKeyStore)
i := 0
//...iterate the buckets...
for id, bucketVal := range lStores {
//...and remove the one which is the pivot node
if id == pivot {
continue
}
//the other ones are added to the array...
stores[i] = bucketVal.(storage.ChunkStore)
i++
}
//...which then gets passed to the round-robin file store
roundRobinFileStore := storage.NewFileStore(newRoundRobinStore(stores...), storage.NewFileStoreParams(), chunk.NewTags())
//now we can actually upload a (random) file to the round-robin store
size := chunkCount * chunkSize
log.Debug("Storing data to file store")
fileHash, wait, err := roundRobinFileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
// wait until all chunks stored
if err != nil {
return err
}
err = wait(ctx)
if err != nil {
return err
}
//get the pivot node's filestore
item, ok := sim.NodeItem(pivot, bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
pivotFileStore := item.(*storage.FileStore)
log.Debug("Starting retrieval routine")
retErrC := make(chan error)
go func() {
// start the retrieval on the pivot node - this will spawn retrieve requests for missing chunks
// we must wait for the peer connections to have started before requesting
n, err := readAll(pivotFileStore, fileHash)
log.Info(fmt.Sprintf("retrieved %v", fileHash), "read", n, "err", err)
retErrC <- err
}()
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
//finally check that the pivot node gets all chunks via the root hash
log.Debug("Check retrieval")
success := true
var total int64
total, err = readAll(pivotFileStore, fileHash)
if err != nil {
return err
}
log.Info(fmt.Sprintf("check if %08x is available locally: number of bytes read %v/%v (error: %v)", fileHash, total, size, err))
if err != nil || total != int64(size) {
success = false
}
if !success {
return fmt.Errorf("Test failed, chunks not available on all nodes")
}
if err := <-retErrC; err != nil {
return fmt.Errorf("requesting chunks: %v", err)
}
log.Debug("Test terminated successfully")
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
})
}
func BenchmarkDeliveryFromNodesWithoutCheck(b *testing.B) {
for chunks := 32; chunks <= 128; chunks *= 2 {
for i := 2; i < 32; i *= 2 {
b.Run(
fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
func(b *testing.B) {
benchmarkDeliveryFromNodes(b, i, chunks, true)
},
)
}
}
}
func BenchmarkDeliveryFromNodesWithCheck(b *testing.B) {
for chunks := 32; chunks <= 128; chunks *= 2 {
for i := 2; i < 32; i *= 2 {
b.Run(
fmt.Sprintf("nodes=%v,chunks=%v", i, chunks),
func(b *testing.B) {
benchmarkDeliveryFromNodes(b, i, chunks, false)
},
)
}
}
}
func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck bool) {
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr, netStore, delivery, clean, err := newNetStoreAndDelivery(ctx, bucket)
if err != nil {
return nil, nil, err
}
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
SkipCheck: skipCheck,
Syncing: SyncingDisabled,
SyncUpdateDelay: 0,
}, nil)
bucket.Store(bucketKeyRegistry, r)
cleanup = func() {
r.Close()
clean()
}
return r, cleanup, nil
},
})
defer sim.Close()
log.Info("Initializing test config")
_, err := sim.AddNodesAndConnectChain(nodes)
if err != nil {
b.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
node := nodeIDs[len(nodeIDs)-1]
item, ok := sim.NodeItem(node, bucketKeyFileStore)
if !ok {
return errors.New("No filestore")
}
remoteFileStore := item.(*storage.FileStore)
pivotNode := nodeIDs[0]
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
if !ok {
return errors.New("No filestore")
}
netStore := item.(*storage.NetStore)
if _, err := sim.WaitTillHealthy(ctx); err != nil {
return err
}
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
// benchmark loop
b.ResetTimer()
b.StopTimer()
Loop:
for i := 0; i < b.N; i++ {
// uploading chunkCount random chunks to the last node
hashes := make([]storage.Address, chunkCount)
for i := 0; i < chunkCount; i++ {
// create actual size real chunks
ctx := context.TODO()
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
if err != nil {
return fmt.Errorf("store: %v", err)
}
// wait until all chunks stored
err = wait(ctx)
if err != nil {
return fmt.Errorf("wait store: %v", err)
}
// collect the hashes
hashes[i] = hash
}
// now benchmark the actual retrieval
// netstore.Get is called for each hash in a go routine and errors are collected
b.StartTimer()
errs := make(chan error)
for _, hash := range hashes {
go func(h storage.Address) {
_, err := netStore.Get(ctx, chunk.ModeGetRequest, h)
log.Warn("test check netstore get", "hash", h, "err", err)
errs <- err
}(hash)
}
// count and report retrieval errors
// if there are misses then chunk timeout is too low for the distance and volume (?)
var total, misses int
for err := range errs {
if err != nil {
log.Warn(err.Error())
misses++
}
total++
if total == chunkCount {
break
}
}
b.StopTimer()
if misses > 0 {
err = fmt.Errorf("%v chunk not found out of %v", misses, total)
break Loop
}
}
return err
})
if result.Error != nil {
b.Fatal(result.Error)
}
}