Fix retrieval tests and simulation backends (#17723)
* swarm/network/stream: introduced visualized snapshot sync test * swarm/network/stream: non-existing hash visualization sim * swarm/network/stream: fixed retrieval tests; new backend for visualization * swarm/network/stream: cleanup of visualized_snapshot_sync_sim_test.go * swarm/network/stream: rebased PR on master * swarm/network/stream: fixed loop logic in retrieval tests * swarm/network/stream: fixed iterations for snapshot tests * swarm/network/stream: address PR comments * swarm/network/stream: addressed PR comments
This commit is contained in:
		@@ -58,6 +58,9 @@ type Event struct {
 | 
			
		||||
 | 
			
		||||
	// Msg is set if the type is EventTypeMsg
 | 
			
		||||
	Msg *Msg `json:"msg,omitempty"`
 | 
			
		||||
 | 
			
		||||
	//Optionally provide data (currently for simulation frontends only)
 | 
			
		||||
	Data interface{} `json:"data"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewEvent creates a new event for the given object which should be either a
 | 
			
		||||
 
 | 
			
		||||
@@ -104,21 +104,14 @@ func TestRetrieval(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
 | 
			
		||||
	"streamer": retrievalStreamerFunc,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
The test loads a snapshot file to construct the swarm network,
 | 
			
		||||
assuming that the snapshot file identifies a healthy
 | 
			
		||||
kademlia network. Nevertheless a health check runs in the
 | 
			
		||||
simulation's `action` function.
 | 
			
		||||
 | 
			
		||||
The snapshot should have 'streamer' in its service list.
 | 
			
		||||
*/
 | 
			
		||||
func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
	sim := simulation.New(map[string]simulation.ServiceFunc{
 | 
			
		||||
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
 | 
			
		||||
			node := ctx.Config.Node()
 | 
			
		||||
			addr := network.NewAddr(node)
 | 
			
		||||
			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
 | 
			
		||||
func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
 | 
			
		||||
	n := ctx.Config.Node()
 | 
			
		||||
	addr := network.NewAddr(n)
 | 
			
		||||
	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, nil, err
 | 
			
		||||
	}
 | 
			
		||||
@@ -136,6 +129,7 @@ func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
	r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 | 
			
		||||
		DoSync:          true,
 | 
			
		||||
		SyncUpdateDelay: 3 * time.Second,
 | 
			
		||||
		DoRetrieve:      true,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
 | 
			
		||||
@@ -148,9 +142,18 @@ func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return r, cleanup, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
/*
 | 
			
		||||
The test loads a snapshot file to construct the swarm network,
 | 
			
		||||
assuming that the snapshot file identifies a healthy
 | 
			
		||||
kademlia network. Nevertheless a health check runs in the
 | 
			
		||||
simulation's `action` function.
 | 
			
		||||
 | 
			
		||||
The snapshot should have 'streamer' in its service list.
 | 
			
		||||
*/
 | 
			
		||||
func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
	sim := simulation.New(retrievalSimServiceMap)
 | 
			
		||||
	defer sim.Close()
 | 
			
		||||
 | 
			
		||||
	log.Info("Initializing test config")
 | 
			
		||||
@@ -200,22 +203,13 @@ func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
 | 
			
		||||
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 | 
			
		||||
		// or until the timeout is reached.
 | 
			
		||||
		allSuccess := false
 | 
			
		||||
		for !allSuccess {
 | 
			
		||||
	REPEAT:
 | 
			
		||||
		for {
 | 
			
		||||
			for _, id := range nodeIDs {
 | 
			
		||||
				//for each expected chunk, check if it is in the local store
 | 
			
		||||
				localChunks := conf.idToChunksMap[id]
 | 
			
		||||
				localSuccess := true
 | 
			
		||||
				for _, ch := range localChunks {
 | 
			
		||||
					//get the real chunk by the index in the index array
 | 
			
		||||
					chunk := conf.hashes[ch]
 | 
			
		||||
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
 | 
			
		||||
					//check if the expected chunk is indeed in the localstore
 | 
			
		||||
					var err error
 | 
			
		||||
					//check on the node's FileStore (netstore)
 | 
			
		||||
				//for each expected file, check if it is in the local store
 | 
			
		||||
				item, ok := sim.NodeItem(id, bucketKeyFileStore)
 | 
			
		||||
				if !ok {
 | 
			
		||||
						return fmt.Errorf("No registry")
 | 
			
		||||
					return fmt.Errorf("No filestore")
 | 
			
		||||
				}
 | 
			
		||||
				fileStore := item.(*storage.FileStore)
 | 
			
		||||
				//check all chunks
 | 
			
		||||
@@ -223,26 +217,15 @@ func runFileRetrievalTest(nodeCount int) error {
 | 
			
		||||
					reader, _ := fileStore.Retrieve(context.TODO(), hash)
 | 
			
		||||
					//check that we can read the file size and that it corresponds to the generated file size
 | 
			
		||||
					if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
 | 
			
		||||
							allSuccess = false
 | 
			
		||||
						log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
 | 
			
		||||
						} else {
 | 
			
		||||
						time.Sleep(500 * time.Millisecond)
 | 
			
		||||
						continue REPEAT
 | 
			
		||||
					}
 | 
			
		||||
					log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
 | 
			
		||||
						localSuccess = false
 | 
			
		||||
					} else {
 | 
			
		||||
						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				allSuccess = localSuccess
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !allSuccess {
 | 
			
		||||
			return fmt.Errorf("Not all chunks succeeded!")
 | 
			
		||||
		}
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
@@ -263,44 +246,7 @@ simulation's `action` function.
 | 
			
		||||
The snapshot should have 'streamer' in its service list.
 | 
			
		||||
*/
 | 
			
		||||
func runRetrievalTest(chunkCount int, nodeCount int) error {
 | 
			
		||||
	sim := simulation.New(map[string]simulation.ServiceFunc{
 | 
			
		||||
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
 | 
			
		||||
			node := ctx.Config.Node()
 | 
			
		||||
			addr := network.NewAddr(node)
 | 
			
		||||
			store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, nil, err
 | 
			
		||||
			}
 | 
			
		||||
			bucket.Store(bucketKeyStore, store)
 | 
			
		||||
 | 
			
		||||
			localStore := store.(*storage.LocalStore)
 | 
			
		||||
			netStore, err := storage.NewNetStore(localStore, nil)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, nil, err
 | 
			
		||||
			}
 | 
			
		||||
			kad := network.NewKademlia(addr.Over(), network.NewKadParams())
 | 
			
		||||
			delivery := NewDelivery(kad, netStore)
 | 
			
		||||
			netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
 | 
			
		||||
 | 
			
		||||
			r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
 | 
			
		||||
				DoSync:          true,
 | 
			
		||||
				SyncUpdateDelay: 0,
 | 
			
		||||
			})
 | 
			
		||||
 | 
			
		||||
			fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
 | 
			
		||||
			bucketKeyFileStore = simulation.BucketKey("filestore")
 | 
			
		||||
			bucket.Store(bucketKeyFileStore, fileStore)
 | 
			
		||||
 | 
			
		||||
			cleanup = func() {
 | 
			
		||||
				os.RemoveAll(datadir)
 | 
			
		||||
				netStore.Close()
 | 
			
		||||
				r.Close()
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			return r, cleanup, nil
 | 
			
		||||
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
	sim := simulation.New(retrievalSimServiceMap)
 | 
			
		||||
	defer sim.Close()
 | 
			
		||||
 | 
			
		||||
	conf := &synctestConfig{}
 | 
			
		||||
@@ -330,8 +276,6 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
 | 
			
		||||
			conf.addrToIDMap[string(a)] = n
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		//an array for the random files
 | 
			
		||||
		var randomFiles []string
 | 
			
		||||
		//this is the node selected for upload
 | 
			
		||||
		node := sim.RandomUpNode()
 | 
			
		||||
		item, ok := sim.NodeItem(node.ID, bucketKeyStore)
 | 
			
		||||
@@ -349,49 +293,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
 | 
			
		||||
 | 
			
		||||
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 | 
			
		||||
		// or until the timeout is reached.
 | 
			
		||||
		allSuccess := false
 | 
			
		||||
		for !allSuccess {
 | 
			
		||||
	REPEAT:
 | 
			
		||||
		for {
 | 
			
		||||
			for _, id := range nodeIDs {
 | 
			
		||||
				//for each expected chunk, check if it is in the local store
 | 
			
		||||
				localChunks := conf.idToChunksMap[id]
 | 
			
		||||
				localSuccess := true
 | 
			
		||||
				for _, ch := range localChunks {
 | 
			
		||||
					//get the real chunk by the index in the index array
 | 
			
		||||
					chunk := conf.hashes[ch]
 | 
			
		||||
					log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
 | 
			
		||||
					//check if the expected chunk is indeed in the localstore
 | 
			
		||||
					var err error
 | 
			
		||||
				//check on the node's FileStore (netstore)
 | 
			
		||||
				item, ok := sim.NodeItem(id, bucketKeyFileStore)
 | 
			
		||||
				if !ok {
 | 
			
		||||
						return fmt.Errorf("No registry")
 | 
			
		||||
					return fmt.Errorf("No filestore")
 | 
			
		||||
				}
 | 
			
		||||
				fileStore := item.(*storage.FileStore)
 | 
			
		||||
				//check all chunks
 | 
			
		||||
					for i, hash := range conf.hashes {
 | 
			
		||||
				for _, hash := range conf.hashes {
 | 
			
		||||
					reader, _ := fileStore.Retrieve(context.TODO(), hash)
 | 
			
		||||
						//check that we can read the file size and that it corresponds to the generated file size
 | 
			
		||||
						if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
 | 
			
		||||
							allSuccess = false
 | 
			
		||||
							log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
 | 
			
		||||
						} else {
 | 
			
		||||
							log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
 | 
			
		||||
					//check that we can read the chunk size and that it corresponds to the generated chunk size
 | 
			
		||||
					if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
 | 
			
		||||
						log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
 | 
			
		||||
						time.Sleep(500 * time.Millisecond)
 | 
			
		||||
						continue REPEAT
 | 
			
		||||
					}
 | 
			
		||||
					log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
 | 
			
		||||
						localSuccess = false
 | 
			
		||||
					} else {
 | 
			
		||||
						log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				allSuccess = localSuccess
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !allSuccess {
 | 
			
		||||
			return fmt.Errorf("Not all chunks succeeded!")
 | 
			
		||||
		}
 | 
			
		||||
			// all nodes and files found, exit loop and return without error
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -31,6 +31,7 @@ import (
 | 
			
		||||
	"github.com/ethereum/go-ethereum/node"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p/enode"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p/simulations"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/network"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 | 
			
		||||
@@ -50,6 +51,17 @@ type synctestConfig struct {
 | 
			
		||||
	addrToIDMap map[string]enode.ID
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	// EventTypeNode is the type of event emitted when a node is either
 | 
			
		||||
	// created, started or stopped
 | 
			
		||||
	EventTypeChunkCreated   simulations.EventType = "chunkCreated"
 | 
			
		||||
	EventTypeChunkOffered   simulations.EventType = "chunkOffered"
 | 
			
		||||
	EventTypeChunkWanted    simulations.EventType = "chunkWanted"
 | 
			
		||||
	EventTypeChunkDelivered simulations.EventType = "chunkDelivered"
 | 
			
		||||
	EventTypeChunkArrived   simulations.EventType = "chunkArrived"
 | 
			
		||||
	EventTypeSimTerminated  simulations.EventType = "simTerminated"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Tests in this file should not request chunks from peers.
 | 
			
		||||
// This function will panic indicating that there is a problem if request has been made.
 | 
			
		||||
func dummyRequestFromPeers(_ context.Context, req *network.Request) (*enode.ID, chan struct{}, error) {
 | 
			
		||||
@@ -131,9 +143,11 @@ func TestSyncingViaDirectSubscribe(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
	sim := simulation.New(map[string]simulation.ServiceFunc{
 | 
			
		||||
		"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
 | 
			
		||||
var simServiceMap = map[string]simulation.ServiceFunc{
 | 
			
		||||
	"streamer": streamerFunc,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func streamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
 | 
			
		||||
	n := ctx.Config.Node()
 | 
			
		||||
	addr := network.NewAddr(n)
 | 
			
		||||
	store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
 | 
			
		||||
@@ -154,6 +168,7 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
		DoSync:          true,
 | 
			
		||||
		SyncUpdateDelay: 3 * time.Second,
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	bucket.Store(bucketKeyRegistry, r)
 | 
			
		||||
 | 
			
		||||
	cleanup = func() {
 | 
			
		||||
@@ -164,8 +179,10 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
 | 
			
		||||
	return r, cleanup, nil
 | 
			
		||||
 | 
			
		||||
		},
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
	sim := simulation.New(simServiceMap)
 | 
			
		||||
	defer sim.Close()
 | 
			
		||||
 | 
			
		||||
	log.Info("Initializing test config")
 | 
			
		||||
@@ -204,7 +221,17 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
 | 
			
		||||
	result := runSim(conf, ctx, sim, chunkCount)
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
		t.Fatal(result.Error)
 | 
			
		||||
	}
 | 
			
		||||
	log.Info("Simulation ended")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulation, chunkCount int) simulation.Result {
 | 
			
		||||
 | 
			
		||||
	return sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
 | 
			
		||||
		nodeIDs := sim.UpNodeIDs()
 | 
			
		||||
		for _, n := range nodeIDs {
 | 
			
		||||
			//get the kademlia overlay address from this ID
 | 
			
		||||
@@ -229,12 +256,19 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		for _, h := range hashes {
 | 
			
		||||
			evt := &simulations.Event{
 | 
			
		||||
				Type: EventTypeChunkCreated,
 | 
			
		||||
				Node: sim.Net.GetNode(node.ID),
 | 
			
		||||
				Data: h.String(),
 | 
			
		||||
			}
 | 
			
		||||
			sim.Net.Events().Send(evt)
 | 
			
		||||
		}
 | 
			
		||||
		conf.hashes = append(conf.hashes, hashes...)
 | 
			
		||||
		mapKeysToNodes(conf)
 | 
			
		||||
 | 
			
		||||
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 | 
			
		||||
		// or until the timeout is reached.
 | 
			
		||||
		allSuccess := false
 | 
			
		||||
		var gDir string
 | 
			
		||||
		var globalStore *mockdb.GlobalStore
 | 
			
		||||
		if *useMockStore {
 | 
			
		||||
@@ -250,12 +284,11 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
				}
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
		for !allSuccess {
 | 
			
		||||
			allSuccess = true
 | 
			
		||||
	REPEAT:
 | 
			
		||||
		for {
 | 
			
		||||
			for _, id := range nodeIDs {
 | 
			
		||||
				//for each expected chunk, check if it is in the local store
 | 
			
		||||
				localChunks := conf.idToChunksMap[id]
 | 
			
		||||
				localSuccess := true
 | 
			
		||||
				for _, ch := range localChunks {
 | 
			
		||||
					//get the real chunk by the index in the index array
 | 
			
		||||
					chunk := conf.hashes[ch]
 | 
			
		||||
@@ -277,29 +310,22 @@ func testSyncingViaGlobalSync(t *testing.T, chunkCount int, nodeCount int) {
 | 
			
		||||
					}
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
 | 
			
		||||
						localSuccess = false
 | 
			
		||||
						// Do not get crazy with logging the warn message
 | 
			
		||||
						time.Sleep(500 * time.Millisecond)
 | 
			
		||||
					} else {
 | 
			
		||||
						continue REPEAT
 | 
			
		||||
					}
 | 
			
		||||
					evt := &simulations.Event{
 | 
			
		||||
						Type: EventTypeChunkArrived,
 | 
			
		||||
						Node: sim.Net.GetNode(id),
 | 
			
		||||
						Data: chunk.String(),
 | 
			
		||||
					}
 | 
			
		||||
					sim.Net.Events().Send(evt)
 | 
			
		||||
					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
				if !localSuccess {
 | 
			
		||||
					allSuccess = false
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !allSuccess {
 | 
			
		||||
			return fmt.Errorf("Not all chunks succeeded!")
 | 
			
		||||
		}
 | 
			
		||||
			return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
		t.Fatal(result.Error)
 | 
			
		||||
		}
 | 
			
		||||
	log.Info("Simulation ended")
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
@@ -459,13 +485,11 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 | 
			
		||||
		}
 | 
			
		||||
		// File retrieval check is repeated until all uploaded files are retrieved from all nodes
 | 
			
		||||
		// or until the timeout is reached.
 | 
			
		||||
		allSuccess := false
 | 
			
		||||
		for !allSuccess {
 | 
			
		||||
			allSuccess = true
 | 
			
		||||
	REPEAT:
 | 
			
		||||
		for {
 | 
			
		||||
			for _, id := range nodeIDs {
 | 
			
		||||
				//for each expected chunk, check if it is in the local store
 | 
			
		||||
				localChunks := conf.idToChunksMap[id]
 | 
			
		||||
				localSuccess := true
 | 
			
		||||
				for _, ch := range localChunks {
 | 
			
		||||
					//get the real chunk by the index in the index array
 | 
			
		||||
					chunk := conf.hashes[ch]
 | 
			
		||||
@@ -487,23 +511,15 @@ func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int)
 | 
			
		||||
					}
 | 
			
		||||
					if err != nil {
 | 
			
		||||
						log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
 | 
			
		||||
						localSuccess = false
 | 
			
		||||
						// Do not get crazy with logging the warn message
 | 
			
		||||
						time.Sleep(500 * time.Millisecond)
 | 
			
		||||
					} else {
 | 
			
		||||
						continue REPEAT
 | 
			
		||||
					}
 | 
			
		||||
					log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
				if !localSuccess {
 | 
			
		||||
					allSuccess = false
 | 
			
		||||
					break
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !allSuccess {
 | 
			
		||||
			return fmt.Errorf("Not all chunks succeeded!")
 | 
			
		||||
		}
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
 
 | 
			
		||||
@@ -522,7 +522,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)")
 | 
			
		||||
	if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: tester.Nodes[0].ID(), Error: expectedError}); err != nil {
 | 
			
		||||
	if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										225
									
								
								swarm/network/stream/visualized_snapshot_sync_sim_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										225
									
								
								swarm/network/stream/visualized_snapshot_sync_sim_test.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,225 @@
 | 
			
		||||
// Copyright 2018 The go-ethereum Authors
 | 
			
		||||
// This file is part of the go-ethereum library.
 | 
			
		||||
//
 | 
			
		||||
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
			
		||||
// it under the terms of the GNU Lesser General Public License as published by
 | 
			
		||||
// the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
// (at your option) any later version.
 | 
			
		||||
//
 | 
			
		||||
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
			
		||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
			
		||||
// GNU Lesser General Public License for more details.
 | 
			
		||||
//
 | 
			
		||||
// You should have received a copy of the GNU Lesser General Public License
 | 
			
		||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
// +build withserver
 | 
			
		||||
 | 
			
		||||
package stream
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p/discover"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/p2p/simulations"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/log"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/network/simulation"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
The tests in this file need to be executed with
 | 
			
		||||
 | 
			
		||||
			-tags=withserver
 | 
			
		||||
 | 
			
		||||
Also, they will stall if executed stand-alone, because they wait
 | 
			
		||||
for the visualization frontend to send a POST /runsim message.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
//setup the sim, evaluate nodeCount and chunkCount and create the sim
 | 
			
		||||
func setupSim(serviceMap map[string]simulation.ServiceFunc) (int, int, *simulation.Simulation) {
 | 
			
		||||
	nodeCount := *nodes
 | 
			
		||||
	chunkCount := *chunks
 | 
			
		||||
 | 
			
		||||
	if nodeCount == 0 || chunkCount == 0 {
 | 
			
		||||
		nodeCount = 32
 | 
			
		||||
		chunkCount = 1
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//setup the simulation with server, which means the sim won't run
 | 
			
		||||
	//until it receives a POST /runsim from the frontend
 | 
			
		||||
	sim := simulation.New(serviceMap).WithServer(":8888")
 | 
			
		||||
	return nodeCount, chunkCount, sim
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//watch for disconnections and wait for healthy
 | 
			
		||||
func watchSim(sim *simulation.Simulation) (context.Context, context.CancelFunc) {
 | 
			
		||||
	ctx, cancelSimRun := context.WithTimeout(context.Background(), 1*time.Minute)
 | 
			
		||||
 | 
			
		||||
	if _, err := sim.WaitTillHealthy(ctx, 2); err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	disconnections := sim.PeerEvents(
 | 
			
		||||
		context.Background(),
 | 
			
		||||
		sim.NodeIDs(),
 | 
			
		||||
		simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeDrop),
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for d := range disconnections {
 | 
			
		||||
			log.Error("peer drop", "node", d.NodeID, "peer", d.Event.Peer)
 | 
			
		||||
			panic("unexpected disconnect")
 | 
			
		||||
			cancelSimRun()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return ctx, cancelSimRun
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//This test requests bogus hashes into the network
 | 
			
		||||
func TestNonExistingHashesWithServer(t *testing.T) {
 | 
			
		||||
	nodeCount, _, sim := setupSim(retrievalSimServiceMap)
 | 
			
		||||
	defer sim.Close()
 | 
			
		||||
 | 
			
		||||
	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancelSimRun := watchSim(sim)
 | 
			
		||||
	defer cancelSimRun()
 | 
			
		||||
 | 
			
		||||
	//in order to get some meaningful visualization, it is beneficial
 | 
			
		||||
	//to define a minimum duration of this test
 | 
			
		||||
	testDuration := 20 * time.Second
 | 
			
		||||
 | 
			
		||||
	result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
 | 
			
		||||
		//check on the node's FileStore (netstore)
 | 
			
		||||
		id := sim.RandomUpNode().ID
 | 
			
		||||
		item, ok := sim.NodeItem(id, bucketKeyFileStore)
 | 
			
		||||
		if !ok {
 | 
			
		||||
			t.Fatalf("No filestore")
 | 
			
		||||
		}
 | 
			
		||||
		fileStore := item.(*storage.FileStore)
 | 
			
		||||
		//create a bogus hash
 | 
			
		||||
		fakeHash := storage.GenerateRandomChunk(1000).Address()
 | 
			
		||||
		//try to retrieve it - will propagate RetrieveRequestMsg into the network
 | 
			
		||||
		reader, _ := fileStore.Retrieve(context.TODO(), fakeHash)
 | 
			
		||||
		if _, err := reader.Size(ctx, nil); err != nil {
 | 
			
		||||
			log.Debug("expected error for non-existing chunk")
 | 
			
		||||
		}
 | 
			
		||||
		//sleep so that the frontend can have something to display
 | 
			
		||||
		time.Sleep(testDuration)
 | 
			
		||||
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
		sendSimTerminatedEvent(sim)
 | 
			
		||||
		t.Fatal(result.Error)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	sendSimTerminatedEvent(sim)
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//send a termination event to the frontend
 | 
			
		||||
func sendSimTerminatedEvent(sim *simulation.Simulation) {
 | 
			
		||||
	evt := &simulations.Event{
 | 
			
		||||
		Type:    EventTypeSimTerminated,
 | 
			
		||||
		Control: false,
 | 
			
		||||
	}
 | 
			
		||||
	sim.Net.Events().Send(evt)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//This test is the same as the snapshot sync test,
 | 
			
		||||
//but with a HTTP server
 | 
			
		||||
//It also sends some custom events so that the frontend
 | 
			
		||||
//can visualize messages like SendOfferedMsg, WantedHashesMsg, DeliveryMsg
 | 
			
		||||
func TestSnapshotSyncWithServer(t *testing.T) {
 | 
			
		||||
 | 
			
		||||
	nodeCount, chunkCount, sim := setupSim(simServiceMap)
 | 
			
		||||
	defer sim.Close()
 | 
			
		||||
 | 
			
		||||
	log.Info("Initializing test config")
 | 
			
		||||
 | 
			
		||||
	conf := &synctestConfig{}
 | 
			
		||||
	//map of discover ID to indexes of chunks expected at that ID
 | 
			
		||||
	conf.idToChunksMap = make(map[discover.NodeID][]int)
 | 
			
		||||
	//map of overlay address to discover ID
 | 
			
		||||
	conf.addrToIDMap = make(map[string]discover.NodeID)
 | 
			
		||||
	//array where the generated chunk hashes will be stored
 | 
			
		||||
	conf.hashes = make([]storage.Address, 0)
 | 
			
		||||
 | 
			
		||||
	err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, cancelSimRun := watchSim(sim)
 | 
			
		||||
	defer cancelSimRun()
 | 
			
		||||
 | 
			
		||||
	//setup filters in the event feed
 | 
			
		||||
	offeredHashesFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(1)
 | 
			
		||||
	wantedFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(2)
 | 
			
		||||
	deliveryFilter := simulation.NewPeerEventsFilter().Type(p2p.PeerEventTypeMsgRecv).Protocol("stream").MsgCode(6)
 | 
			
		||||
	eventC := sim.PeerEvents(ctx, sim.UpNodeIDs(), offeredHashesFilter, wantedFilter, deliveryFilter)
 | 
			
		||||
 | 
			
		||||
	quit := make(chan struct{})
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		for e := range eventC {
 | 
			
		||||
			select {
 | 
			
		||||
			case <-quit:
 | 
			
		||||
				fmt.Println("quitting event loop")
 | 
			
		||||
				return
 | 
			
		||||
			default:
 | 
			
		||||
			}
 | 
			
		||||
			if e.Error != nil {
 | 
			
		||||
				t.Fatal(e.Error)
 | 
			
		||||
			}
 | 
			
		||||
			if *e.Event.MsgCode == uint64(1) {
 | 
			
		||||
				evt := &simulations.Event{
 | 
			
		||||
					Type:    EventTypeChunkOffered,
 | 
			
		||||
					Node:    sim.Net.GetNode(e.NodeID),
 | 
			
		||||
					Control: false,
 | 
			
		||||
				}
 | 
			
		||||
				sim.Net.Events().Send(evt)
 | 
			
		||||
			} else if *e.Event.MsgCode == uint64(2) {
 | 
			
		||||
				evt := &simulations.Event{
 | 
			
		||||
					Type:    EventTypeChunkWanted,
 | 
			
		||||
					Node:    sim.Net.GetNode(e.NodeID),
 | 
			
		||||
					Control: false,
 | 
			
		||||
				}
 | 
			
		||||
				sim.Net.Events().Send(evt)
 | 
			
		||||
			} else if *e.Event.MsgCode == uint64(6) {
 | 
			
		||||
				evt := &simulations.Event{
 | 
			
		||||
					Type:    EventTypeChunkDelivered,
 | 
			
		||||
					Node:    sim.Net.GetNode(e.NodeID),
 | 
			
		||||
					Control: false,
 | 
			
		||||
				}
 | 
			
		||||
				sim.Net.Events().Send(evt)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	//run the sim
 | 
			
		||||
	result := runSim(conf, ctx, sim, chunkCount)
 | 
			
		||||
 | 
			
		||||
	//send terminated event
 | 
			
		||||
	evt := &simulations.Event{
 | 
			
		||||
		Type:    EventTypeSimTerminated,
 | 
			
		||||
		Control: false,
 | 
			
		||||
	}
 | 
			
		||||
	sim.Net.Events().Send(evt)
 | 
			
		||||
 | 
			
		||||
	if result.Error != nil {
 | 
			
		||||
		panic(result.Error)
 | 
			
		||||
	}
 | 
			
		||||
	close(quit)
 | 
			
		||||
	log.Info("Simulation ended")
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user