swarm/network/stream: add syncing test (#1399)

* swarm/network/stream: remove old syncing test and replace it with two more granular tests, with the first one testing a full sync between two nodes (makes sure that when all subscriptions are established on all bins - all content is replicated from one node to the other), and a second test that adds a bigger amount of nodes to a star network topology, effectively making the pivot node have a depth > 0 to sync just certain content to the connected nodes (due to a star topology this test (always 1 hop) does not check that, for example, specific content is routed across multiple nodes, but assumes content should be only on the most proximate node)

* api, cmd/swarm-smoke, fuse, network, storage: fix data dir leak when using ephemeral filestore, pull test params to be easily adjustable
This commit is contained in:
acud
2019-06-10 10:52:19 +02:00
committed by GitHub
parent ee7ccbdb4d
commit c1126aaa1b
6 changed files with 363 additions and 100 deletions

View File

@ -53,10 +53,11 @@ func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) {
}
defer os.RemoveAll(datadir)
tags := chunk.NewTags()
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags)
if err != nil {
return
}
defer cleanup()
api := NewAPI(fileStore, nil, nil, nil, tags)
f(api, tags, v)
}

View File

@ -258,10 +258,11 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
defer cleanup()
reader := bytes.NewReader(testData)
return fileStore.GetAllReferences(context.Background(), reader, false)

View File

@ -1615,10 +1615,11 @@ func TestFUSE(t *testing.T) {
}
defer os.RemoveAll(datadir)
fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
t.Fatal(err)
}
defer cleanup()
ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil, chunk.NewTags())}
//run a short suite of tests

View File

@ -17,6 +17,7 @@
package stream
import (
"bytes"
"context"
"errors"
"flag"
@ -44,7 +45,6 @@ import (
"github.com/ethersphere/swarm/storage/localstore"
"github.com/ethersphere/swarm/storage/mock"
"github.com/ethersphere/swarm/testutil"
colorable "github.com/mattn/go-colorable"
)
var (
@ -69,7 +69,7 @@ func init() {
rand.Seed(time.Now().UnixNano())
log.PrintOrigins(true)
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true))))
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
}
// newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations
@ -399,3 +399,19 @@ func (b *boolean) bool() bool {
return b.v
}
func getAllRefs(testData []byte) (storage.AddressCollection, error) {
datadir, err := ioutil.TempDir("", "chunk-debug")
if err != nil {
return nil, fmt.Errorf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(datadir)
fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags())
if err != nil {
return nil, err
}
defer cleanup()
reader := bytes.NewReader(testData)
return fileStore.GetAllReferences(context.Background(), reader, false)
}

View File

@ -17,11 +17,13 @@
package stream
import (
"bytes"
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"testing"
"time"
@ -38,28 +40,23 @@ import (
"github.com/ethersphere/swarm/testutil"
)
const dataChunkCount = 200
func TestSyncerSimulation(t *testing.T) {
testSyncBetweenNodes(t, 2, dataChunkCount, true, 1)
// This test uses much more memory when running with
// race detector. Allow it to finish successfully by
// reducing its scope, and still check for data races
// with the smallest number of nodes.
if !testutil.RaceEnabled {
testSyncBetweenNodes(t, 4, dataChunkCount, true, 1)
testSyncBetweenNodes(t, 8, dataChunkCount, true, 1)
testSyncBetweenNodes(t, 16, dataChunkCount, true, 1)
}
}
func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, po uint8) {
const dataChunkCount = 1000
// TestTwoNodesFullSync connects two nodes, uploads content to one node and expects the
// uploader node's chunks to be synced to the second node. This is expected behaviour since although
// both nodes might share address bits, due to kademlia depth=0 when under ProxBinSize - this will
// eventually create subscriptions on all bins between the two nodes, causing a full sync between them
// The test checks that:
// 1. All subscriptions are created
// 2. All chunks are transferred from one node to another (asserted by summing and comparing bin indexes on both nodes)
func TestTwoNodesFullSync(t *testing.T) { //
var (
chunkCount = 1000 //~4mb
syncTime = 5 * time.Second
)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr := network.NewAddr(ctx.Config.Node())
//hack to put addresses in same space
addr.OAddr[0] = byte(0)
netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
if err != nil {
@ -83,8 +80,9 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
}
r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
Syncing: SyncingAutoSubscribe,
SkipCheck: skipCheck,
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 500 * time.Millisecond, //this is needed to trigger the update subscriptions loop
SkipCheck: true,
}, nil)
cleanup = func() {
@ -106,12 +104,16 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
// defer cancel should come before defer simulation teardown
defer cancel()
_, err := sim.AddNodesAndConnectChain(nodes)
_, err := sim.AddNodesAndConnectChain(2)
if err != nil {
t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
if len(nodeIDs) != 2 {
return errors.New("not enough nodes up")
}
nodeIndex := make(map[enode.ID]int)
for i, id := range nodeIDs {
@ -125,92 +127,105 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
}
}()
// each node Subscribes to each other's swarmChunkServerStreamName
for j := 0; j < nodes-1; j++ {
id := nodeIDs[j]
client, err := sim.Net.GetNode(id).Client()
item, ok := sim.NodeItem(nodeIDs[0], bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
size := chunkCount * chunkSize
_, wait1, err := fileStore.Store(ctx, testutil.RandomReader(0, size), int64(size), false)
if err != nil {
return fmt.Errorf("fileStore.Store: %v", err)
}
_, wait2, err := fileStore.Store(ctx, testutil.RandomReader(10, size), int64(size), false)
if err != nil {
return fmt.Errorf("fileStore.Store: %v", err)
}
wait1(ctx)
wait2(ctx)
time.Sleep(1 * time.Second)
//explicitly check that all subscriptions are there on all bins
for idx, id := range nodeIDs {
node := sim.Net.GetNode(id)
client, err := node.Client()
if err != nil {
return fmt.Errorf("node %s client: %v", id, err)
return fmt.Errorf("create node %d rpc client fail: %v", idx, err)
}
sid := nodeIDs[j+1]
client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top)
//ask it for subscriptions
pstreams := make(map[string][]string)
err = client.Call(&pstreams, "stream_getPeerServerSubscriptions")
if err != nil {
return err
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
if j > 0 || nodes == 2 {
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
for _, streams := range pstreams {
b := make([]bool, 17)
for _, sub := range streams {
subPO, err := ParseSyncBinKey(strings.Split(sub, "|")[1])
if err != nil {
return err
}
b[int(subPO)] = true
}
fileStore := item.(*storage.FileStore)
size := chunkCount * chunkSize
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false)
if err != nil {
return fmt.Errorf("fileStore.Store: %v", err)
for bin, v := range b {
if !v {
return fmt.Errorf("did not find any subscriptions for node %d on bin %d", idx, bin)
}
}
wait(ctx)
}
}
// here we distribute chunks of a random file into stores 1...nodes
// collect hashes in po 1 bin for each node
hashes := make([][]storage.Address, nodes)
totalHashes := 0
hashCounts := make([]int, nodes)
for i := nodes - 1; i >= 0; i-- {
if i < nodes-1 {
hashCounts[i] = hashCounts[i+1]
log.Debug("subscriptions on all bins exist between the two nodes, proceeding to check bin indexes")
log.Debug("uploader node", "enode", nodeIDs[0])
item, ok = sim.NodeItem(nodeIDs[0], bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
store := item.(chunk.Store)
uploaderNodeBinIDs := make([]uint64, 17)
log.Debug("checking pull subscription bin ids")
for po := 0; po <= 16; po++ {
until, err := store.LastPullSubscriptionBinID(uint8(po))
if err != nil {
t.Fatal(err)
}
item, ok := sim.NodeItem(nodeIDs[i], bucketKeyStore)
uploaderNodeBinIDs[po] = until
}
// wait for syncing
time.Sleep(syncTime)
// check that the sum of bin indexes is equal
for idx := range nodeIDs {
if nodeIDs[idx] == nodeIDs[0] {
continue
}
log.Debug("compare to", "enode", nodeIDs[idx])
item, ok = sim.NodeItem(nodeIDs[idx], bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
store := item.(chunk.Store)
until, err := store.LastPullSubscriptionBinID(po)
if err != nil {
return err
}
if until > 0 {
c, _ := store.SubscribePull(ctx, po, 0, until)
for iterate := true; iterate; {
select {
case cd, ok := <-c:
if !ok {
iterate = false
break
}
hashes[i] = append(hashes[i], cd.Address)
totalHashes++
hashCounts[i]++
case <-ctx.Done():
return ctx.Err()
}
}
}
}
var total, found int
for _, node := range nodeIDs {
i := nodeIndex[node]
db := item.(chunk.Store)
for j := i; j < nodes; j++ {
total += len(hashes[j])
for _, key := range hashes[j] {
item, ok := sim.NodeItem(nodeIDs[j], bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
db := item.(chunk.Store)
_, err := db.Get(ctx, chunk.ModeGetRequest, key)
if err == nil {
found++
}
uploaderSum, otherNodeSum := 0, 0
for po, uploaderUntil := range uploaderNodeBinIDs {
shouldUntil, err := db.LastPullSubscriptionBinID(uint8(po))
if err != nil {
t.Fatal(err)
}
otherNodeSum += int(shouldUntil)
uploaderSum += int(uploaderUntil)
}
if uploaderSum != otherNodeSum {
t.Fatalf("bin indice sum mismatch. got %d want %d", otherNodeSum, uploaderSum)
}
log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total)
}
if total == found && total > 0 {
return nil
}
return fmt.Errorf("Total not equallying found %v: total is %d", found, total)
return nil
})
if result.Error != nil {
@ -218,6 +233,233 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p
}
}
// TestStarNetworkSync tests that syncing works on a more elaborate network topology
// the test creates a network of 10 nodes and connects them in a star topology, this causes
// the pivot node to have neighbourhood depth > 0, which in turn means that each individual node
// will only get SOME of the chunks that exist on the uploader node (the pivot node).
// The test checks that EVERY chunk that exists on the pivot node:
// a. exists on the most proximate node
// b. exists on the nodes subscribed on the corresponding chunk PO
// c. does not exist on the peers that do not have that PO subscription
func TestStarNetworkSync(t *testing.T) {
if testutil.RaceEnabled {
return
}
var (
chunkCount = 500
nodeCount = 6
simTimeout = 60 * time.Second
syncTime = 30 * time.Second
filesize = chunkCount * chunkSize
)
sim := simulation.New(map[string]simulation.ServiceFunc{
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
addr := network.NewAddr(ctx.Config.Node())
netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr)
if err != nil {
return nil, nil, err
}
var dir string
var store *state.DBStore
if testutil.RaceEnabled {
// Use on-disk DBStore to reduce memory consumption in race tests.
dir, err = ioutil.TempDir("", "swarm-stream-")
if err != nil {
return nil, nil, err
}
store, err = state.NewDBStore(dir)
if err != nil {
return nil, nil, err
}
} else {
store = state.NewInmemoryStore()
}
r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{
Syncing: SyncingAutoSubscribe,
SyncUpdateDelay: 200 * time.Millisecond,
SkipCheck: true,
}, nil)
cleanup = func() {
r.Close()
clean()
if dir != "" {
os.RemoveAll(dir)
}
}
return r, cleanup, nil
},
})
defer sim.Close()
// create context for simulation run
ctx, cancel := context.WithTimeout(context.Background(), simTimeout)
// defer cancel should come before defer simulation teardown
defer cancel()
_, err := sim.AddNodesAndConnectStar(nodeCount)
if err != nil {
t.Fatal(err)
}
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
nodeIDs := sim.UpNodeIDs()
nodeIndex := make(map[enode.ID]int)
for i, id := range nodeIDs {
nodeIndex[id] = i
}
disconnected := watchDisconnections(ctx, sim)
defer func() {
if err != nil && disconnected.bool() {
err = errors.New("disconnect events received")
}
}()
seed := int(time.Now().Unix())
randomBytes := testutil.RandomBytes(seed, filesize)
chunkAddrs, err := getAllRefs(randomBytes[:])
if err != nil {
return err
}
chunksProx := make([]chunkProxData, 0)
for _, chunkAddr := range chunkAddrs {
chunkInfo := chunkProxData{
addr: chunkAddr,
uploaderNodePO: chunk.Proximity(nodeIDs[0].Bytes(), chunkAddr),
nodeProximities: make(map[enode.ID]int),
}
closestNodePO := 0
for nodeAddr := range nodeIndex {
po := chunk.Proximity(nodeAddr.Bytes(), chunkAddr)
chunkInfo.nodeProximities[nodeAddr] = po
if po > closestNodePO {
chunkInfo.closestNodePO = po
chunkInfo.closestNode = nodeAddr
}
log.Trace("processed chunk", "uploaderPO", chunkInfo.uploaderNodePO, "ci", chunkInfo.closestNode, "cpo", chunkInfo.closestNodePO, "cadrr", chunkInfo.addr)
}
chunksProx = append(chunksProx, chunkInfo)
}
// get the pivot node and pump some data
item, ok := sim.NodeItem(nodeIDs[0], bucketKeyFileStore)
if !ok {
return fmt.Errorf("No filestore")
}
fileStore := item.(*storage.FileStore)
reader := bytes.NewReader(randomBytes[:])
_, wait1, err := fileStore.Store(ctx, reader, int64(len(randomBytes)), false)
if err != nil {
return fmt.Errorf("fileStore.Store: %v", err)
}
wait1(ctx)
// check that chunks with a marked proximate host are where they should be
count := 0
// wait to sync
time.Sleep(syncTime)
log.Info("checking if chunks are on prox hosts")
for _, c := range chunksProx {
// if the most proximate host is set - check that the chunk is there
if c.closestNodePO > 0 {
count++
log.Trace("found chunk with proximate host set, trying to find in localstore", "po", c.closestNodePO, "closestNode", c.closestNode)
item, ok = sim.NodeItem(c.closestNode, bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
store := item.(chunk.Store)
_, err := store.Get(context.TODO(), chunk.ModeGetRequest, c.addr)
if err != nil {
return err
}
}
}
log.Debug("done checking stores", "checked chunks", count, "total chunks", len(chunksProx))
if count != len(chunksProx) {
return fmt.Errorf("checked chunks dont match numer of chunks. got %d want %d", count, len(chunksProx))
}
// check that chunks from each po are _not_ on nodes that don't have subscriptions for these POs
node := sim.Net.GetNode(nodeIDs[0])
client, err := node.Client()
if err != nil {
return fmt.Errorf("create node 1 rpc client fail: %v", err)
}
//ask it for subscriptions
pstreams := make(map[string][]string)
err = client.Call(&pstreams, "stream_getPeerServerSubscriptions")
if err != nil {
return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err)
}
//create a map of no-subs for a node
noSubMap := make(map[enode.ID]map[int]bool)
for subscribedNode, streams := range pstreams {
id := enode.HexID(subscribedNode)
b := make([]bool, 17)
for _, sub := range streams {
subPO, err := ParseSyncBinKey(strings.Split(sub, "|")[1])
if err != nil {
return err
}
b[int(subPO)] = true
}
noMapMap := make(map[int]bool)
for i, v := range b {
if !v {
noMapMap[i] = true
}
}
noSubMap[id] = noMapMap
}
// iterate over noSubMap, for each node check if it has any of the chunks it shouldn't have
for nodeId, nodeNoSubs := range noSubMap {
for _, c := range chunksProx {
// if the chunk PO is equal to the sub that the node shouldnt have - check if the node has the chunk!
if _, ok := nodeNoSubs[c.uploaderNodePO]; ok {
count++
item, ok = sim.NodeItem(nodeId, bucketKeyStore)
if !ok {
return fmt.Errorf("No DB")
}
store := item.(chunk.Store)
_, err := store.Get(context.TODO(), chunk.ModeGetRequest, c.addr)
if err == nil {
return fmt.Errorf("got a chunk where it shouldn't be! addr %s, nodeId %s", c.addr, nodeId)
}
}
}
}
return nil
})
if result.Error != nil {
t.Fatal(result.Error)
}
}
type chunkProxData struct {
addr chunk.Address
uploaderNodePO int
nodeProximities map[enode.ID]int
closestNode enode.ID
closestNodePO int
}
//TestSameVersionID just checks that if the version is not changed,
//then streamer peers see each other
func TestSameVersionID(t *testing.T) {
@ -343,5 +585,4 @@ func TestDifferentVersionID(t *testing.T) {
t.Fatal(result.Error)
}
log.Info("Simulation ended")
}

View File

@ -61,12 +61,15 @@ func NewFileStoreParams() *FileStoreParams {
}
// for testing locally
func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, error) {
func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, func(), error) {
localStore, err := localstore.New(datadir, basekey, nil)
if err != nil {
return nil, err
return nil, nil, err
}
return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), nil
cleanup := func() {
localStore.Close()
}
return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), cleanup, nil
}
func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *FileStore {