diff --git a/api/api_test.go b/api/api_test.go index 878fa3a4b9..b7ff25e0e6 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -53,10 +53,11 @@ func testAPI(t *testing.T, f func(*API, *chunk.Tags, bool)) { } defer os.RemoveAll(datadir) tags := chunk.NewTags() - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags) + fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), tags) if err != nil { return } + defer cleanup() api := NewAPI(fileStore, nil, nil, nil, tags) f(api, tags, v) } diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index 62ef313f0b..741d7d361c 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -258,10 +258,11 @@ func getAllRefs(testData []byte) (storage.AddressCollection, error) { return nil, fmt.Errorf("unable to create temp dir: %v", err) } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) + fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { return nil, err } + defer cleanup() reader := bytes.NewReader(testData) return fileStore.GetAllReferences(context.Background(), reader, false) diff --git a/fuse/swarmfs_test.go b/fuse/swarmfs_test.go index 90c88187f5..a469afed6b 100644 --- a/fuse/swarmfs_test.go +++ b/fuse/swarmfs_test.go @@ -1615,10 +1615,11 @@ func TestFUSE(t *testing.T) { } defer os.RemoveAll(datadir) - fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) + fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) if err != nil { t.Fatal(err) } + defer cleanup() ta := &testAPI{api: api.NewAPI(fileStore, nil, nil, nil, chunk.NewTags())} //run a short suite of tests diff --git a/network/stream/common_test.go b/network/stream/common_test.go index e4bb036dc0..6d7d7d68e5 100644 --- a/network/stream/common_test.go +++ b/network/stream/common_test.go @@ -17,6 +17,7 @@ package stream import ( + "bytes" "context" "errors" "flag" @@ -44,7 +45,6 @@ import ( "github.com/ethersphere/swarm/storage/localstore" "github.com/ethersphere/swarm/storage/mock" "github.com/ethersphere/swarm/testutil" - colorable "github.com/mattn/go-colorable" ) var ( @@ -69,7 +69,7 @@ func init() { rand.Seed(time.Now().UnixNano()) log.PrintOrigins(true) - log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(colorable.NewColorableStderr(), log.TerminalFormat(true)))) + log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(*loglevel), log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) } // newNetStoreAndDelivery is a default constructor for BzzAddr, NetStore and Delivery, used in Simulations @@ -399,3 +399,19 @@ func (b *boolean) bool() bool { return b.v } + +func getAllRefs(testData []byte) (storage.AddressCollection, error) { + datadir, err := ioutil.TempDir("", "chunk-debug") + if err != nil { + return nil, fmt.Errorf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(datadir) + fileStore, cleanup, err := storage.NewLocalFileStore(datadir, make([]byte, 32), chunk.NewTags()) + if err != nil { + return nil, err + } + defer cleanup() + + reader := bytes.NewReader(testData) + return fileStore.GetAllReferences(context.Background(), reader, false) +} diff --git a/network/stream/syncer_test.go b/network/stream/syncer_test.go index 3b56006771..1774547191 100644 --- a/network/stream/syncer_test.go +++ b/network/stream/syncer_test.go @@ -17,11 +17,13 @@ package stream import ( + "bytes" "context" "errors" "fmt" "io/ioutil" "os" + "strings" "sync" "testing" "time" @@ -38,28 +40,23 @@ import ( "github.com/ethersphere/swarm/testutil" ) -const dataChunkCount = 200 - -func TestSyncerSimulation(t *testing.T) { - testSyncBetweenNodes(t, 2, dataChunkCount, true, 1) - // This test uses much more memory when running with - // race detector. Allow it to finish successfully by - // reducing its scope, and still check for data races - // with the smallest number of nodes. - if !testutil.RaceEnabled { - testSyncBetweenNodes(t, 4, dataChunkCount, true, 1) - testSyncBetweenNodes(t, 8, dataChunkCount, true, 1) - testSyncBetweenNodes(t, 16, dataChunkCount, true, 1) - } -} - -func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, po uint8) { +const dataChunkCount = 1000 +// TestTwoNodesFullSync connects two nodes, uploads content to one node and expects the +// uploader node's chunks to be synced to the second node. This is expected behaviour since although +// both nodes might share address bits, due to kademlia depth=0 when under ProxBinSize - this will +// eventually create subscriptions on all bins between the two nodes, causing a full sync between them +// The test checks that: +// 1. All subscriptions are created +// 2. All chunks are transferred from one node to another (asserted by summing and comparing bin indexes on both nodes) +func TestTwoNodesFullSync(t *testing.T) { // + var ( + chunkCount = 1000 //~4mb + syncTime = 5 * time.Second + ) sim := simulation.New(map[string]simulation.ServiceFunc{ "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { addr := network.NewAddr(ctx.Config.Node()) - //hack to put addresses in same space - addr.OAddr[0] = byte(0) netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr) if err != nil { @@ -83,8 +80,9 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ - Syncing: SyncingAutoSubscribe, - SkipCheck: skipCheck, + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 500 * time.Millisecond, //this is needed to trigger the update subscriptions loop + SkipCheck: true, }, nil) cleanup = func() { @@ -106,12 +104,16 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p // defer cancel should come before defer simulation teardown defer cancel() - _, err := sim.AddNodesAndConnectChain(nodes) + _, err := sim.AddNodesAndConnectChain(2) if err != nil { t.Fatal(err) } + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { nodeIDs := sim.UpNodeIDs() + if len(nodeIDs) != 2 { + return errors.New("not enough nodes up") + } nodeIndex := make(map[enode.ID]int) for i, id := range nodeIDs { @@ -125,92 +127,105 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } }() - // each node Subscribes to each other's swarmChunkServerStreamName - for j := 0; j < nodes-1; j++ { - id := nodeIDs[j] - client, err := sim.Net.GetNode(id).Client() + item, ok := sim.NodeItem(nodeIDs[0], bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + size := chunkCount * chunkSize + + _, wait1, err := fileStore.Store(ctx, testutil.RandomReader(0, size), int64(size), false) + if err != nil { + return fmt.Errorf("fileStore.Store: %v", err) + } + + _, wait2, err := fileStore.Store(ctx, testutil.RandomReader(10, size), int64(size), false) + if err != nil { + return fmt.Errorf("fileStore.Store: %v", err) + } + + wait1(ctx) + wait2(ctx) + time.Sleep(1 * time.Second) + + //explicitly check that all subscriptions are there on all bins + for idx, id := range nodeIDs { + node := sim.Net.GetNode(id) + client, err := node.Client() if err != nil { - return fmt.Errorf("node %s client: %v", id, err) + return fmt.Errorf("create node %d rpc client fail: %v", idx, err) } - sid := nodeIDs[j+1] - client.CallContext(ctx, nil, "stream_subscribeStream", sid, NewStream("SYNC", FormatSyncBinKey(1), false), NewRange(0, 0), Top) + + //ask it for subscriptions + pstreams := make(map[string][]string) + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") if err != nil { - return err + return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) } - if j > 0 || nodes == 2 { - item, ok := sim.NodeItem(nodeIDs[j], bucketKeyFileStore) - if !ok { - return fmt.Errorf("No filestore") + for _, streams := range pstreams { + b := make([]bool, 17) + for _, sub := range streams { + subPO, err := ParseSyncBinKey(strings.Split(sub, "|")[1]) + if err != nil { + return err + } + b[int(subPO)] = true } - fileStore := item.(*storage.FileStore) - size := chunkCount * chunkSize - _, wait, err := fileStore.Store(ctx, testutil.RandomReader(j, size), int64(size), false) - if err != nil { - return fmt.Errorf("fileStore.Store: %v", err) + for bin, v := range b { + if !v { + return fmt.Errorf("did not find any subscriptions for node %d on bin %d", idx, bin) + } } - wait(ctx) } } - // here we distribute chunks of a random file into stores 1...nodes - // collect hashes in po 1 bin for each node - hashes := make([][]storage.Address, nodes) - totalHashes := 0 - hashCounts := make([]int, nodes) - for i := nodes - 1; i >= 0; i-- { - if i < nodes-1 { - hashCounts[i] = hashCounts[i+1] + log.Debug("subscriptions on all bins exist between the two nodes, proceeding to check bin indexes") + log.Debug("uploader node", "enode", nodeIDs[0]) + item, ok = sim.NodeItem(nodeIDs[0], bucketKeyStore) + if !ok { + return fmt.Errorf("No DB") + } + store := item.(chunk.Store) + uploaderNodeBinIDs := make([]uint64, 17) + + log.Debug("checking pull subscription bin ids") + for po := 0; po <= 16; po++ { + until, err := store.LastPullSubscriptionBinID(uint8(po)) + if err != nil { + t.Fatal(err) } - item, ok := sim.NodeItem(nodeIDs[i], bucketKeyStore) + + uploaderNodeBinIDs[po] = until + } + // wait for syncing + time.Sleep(syncTime) + + // check that the sum of bin indexes is equal + for idx := range nodeIDs { + if nodeIDs[idx] == nodeIDs[0] { + continue + } + + log.Debug("compare to", "enode", nodeIDs[idx]) + item, ok = sim.NodeItem(nodeIDs[idx], bucketKeyStore) if !ok { return fmt.Errorf("No DB") } - store := item.(chunk.Store) - until, err := store.LastPullSubscriptionBinID(po) - if err != nil { - return err - } - if until > 0 { - c, _ := store.SubscribePull(ctx, po, 0, until) - for iterate := true; iterate; { - select { - case cd, ok := <-c: - if !ok { - iterate = false - break - } - hashes[i] = append(hashes[i], cd.Address) - totalHashes++ - hashCounts[i]++ - case <-ctx.Done(): - return ctx.Err() - } - } - } - } - var total, found int - for _, node := range nodeIDs { - i := nodeIndex[node] + db := item.(chunk.Store) - for j := i; j < nodes; j++ { - total += len(hashes[j]) - for _, key := range hashes[j] { - item, ok := sim.NodeItem(nodeIDs[j], bucketKeyStore) - if !ok { - return fmt.Errorf("No DB") - } - db := item.(chunk.Store) - _, err := db.Get(ctx, chunk.ModeGetRequest, key) - if err == nil { - found++ - } + uploaderSum, otherNodeSum := 0, 0 + for po, uploaderUntil := range uploaderNodeBinIDs { + shouldUntil, err := db.LastPullSubscriptionBinID(uint8(po)) + if err != nil { + t.Fatal(err) } + otherNodeSum += int(shouldUntil) + uploaderSum += int(uploaderUntil) + } + if uploaderSum != otherNodeSum { + t.Fatalf("bin indice sum mismatch. got %d want %d", otherNodeSum, uploaderSum) } - log.Debug("sync check", "node", node, "index", i, "bin", po, "found", found, "total", total) } - if total == found && total > 0 { - return nil - } - return fmt.Errorf("Total not equallying found %v: total is %d", found, total) + return nil }) if result.Error != nil { @@ -218,6 +233,233 @@ func testSyncBetweenNodes(t *testing.T, nodes, chunkCount int, skipCheck bool, p } } +// TestStarNetworkSync tests that syncing works on a more elaborate network topology +// the test creates a network of 10 nodes and connects them in a star topology, this causes +// the pivot node to have neighbourhood depth > 0, which in turn means that each individual node +// will only get SOME of the chunks that exist on the uploader node (the pivot node). +// The test checks that EVERY chunk that exists on the pivot node: +// a. exists on the most proximate node +// b. exists on the nodes subscribed on the corresponding chunk PO +// c. does not exist on the peers that do not have that PO subscription +func TestStarNetworkSync(t *testing.T) { + if testutil.RaceEnabled { + return + } + var ( + chunkCount = 500 + nodeCount = 6 + simTimeout = 60 * time.Second + syncTime = 30 * time.Second + filesize = chunkCount * chunkSize + ) + sim := simulation.New(map[string]simulation.ServiceFunc{ + "streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) { + addr := network.NewAddr(ctx.Config.Node()) + + netStore, delivery, clean, err := newNetStoreAndDeliveryWithBzzAddr(ctx, bucket, addr) + if err != nil { + return nil, nil, err + } + + var dir string + var store *state.DBStore + if testutil.RaceEnabled { + // Use on-disk DBStore to reduce memory consumption in race tests. + dir, err = ioutil.TempDir("", "swarm-stream-") + if err != nil { + return nil, nil, err + } + store, err = state.NewDBStore(dir) + if err != nil { + return nil, nil, err + } + } else { + store = state.NewInmemoryStore() + } + + r := NewRegistry(addr.ID(), delivery, netStore, store, &RegistryOptions{ + Syncing: SyncingAutoSubscribe, + SyncUpdateDelay: 200 * time.Millisecond, + SkipCheck: true, + }, nil) + + cleanup = func() { + r.Close() + clean() + if dir != "" { + os.RemoveAll(dir) + } + } + + return r, cleanup, nil + }, + }) + defer sim.Close() + + // create context for simulation run + ctx, cancel := context.WithTimeout(context.Background(), simTimeout) + // defer cancel should come before defer simulation teardown + defer cancel() + _, err := sim.AddNodesAndConnectStar(nodeCount) + if err != nil { + t.Fatal(err) + } + + result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) { + nodeIDs := sim.UpNodeIDs() + + nodeIndex := make(map[enode.ID]int) + for i, id := range nodeIDs { + nodeIndex[id] = i + } + disconnected := watchDisconnections(ctx, sim) + defer func() { + if err != nil && disconnected.bool() { + err = errors.New("disconnect events received") + } + }() + seed := int(time.Now().Unix()) + randomBytes := testutil.RandomBytes(seed, filesize) + + chunkAddrs, err := getAllRefs(randomBytes[:]) + if err != nil { + return err + } + chunksProx := make([]chunkProxData, 0) + for _, chunkAddr := range chunkAddrs { + chunkInfo := chunkProxData{ + addr: chunkAddr, + uploaderNodePO: chunk.Proximity(nodeIDs[0].Bytes(), chunkAddr), + nodeProximities: make(map[enode.ID]int), + } + closestNodePO := 0 + for nodeAddr := range nodeIndex { + po := chunk.Proximity(nodeAddr.Bytes(), chunkAddr) + + chunkInfo.nodeProximities[nodeAddr] = po + if po > closestNodePO { + chunkInfo.closestNodePO = po + chunkInfo.closestNode = nodeAddr + } + log.Trace("processed chunk", "uploaderPO", chunkInfo.uploaderNodePO, "ci", chunkInfo.closestNode, "cpo", chunkInfo.closestNodePO, "cadrr", chunkInfo.addr) + } + chunksProx = append(chunksProx, chunkInfo) + } + + // get the pivot node and pump some data + item, ok := sim.NodeItem(nodeIDs[0], bucketKeyFileStore) + if !ok { + return fmt.Errorf("No filestore") + } + fileStore := item.(*storage.FileStore) + reader := bytes.NewReader(randomBytes[:]) + _, wait1, err := fileStore.Store(ctx, reader, int64(len(randomBytes)), false) + if err != nil { + return fmt.Errorf("fileStore.Store: %v", err) + } + + wait1(ctx) + + // check that chunks with a marked proximate host are where they should be + count := 0 + + // wait to sync + time.Sleep(syncTime) + + log.Info("checking if chunks are on prox hosts") + for _, c := range chunksProx { + // if the most proximate host is set - check that the chunk is there + if c.closestNodePO > 0 { + count++ + log.Trace("found chunk with proximate host set, trying to find in localstore", "po", c.closestNodePO, "closestNode", c.closestNode) + item, ok = sim.NodeItem(c.closestNode, bucketKeyStore) + if !ok { + return fmt.Errorf("No DB") + } + store := item.(chunk.Store) + + _, err := store.Get(context.TODO(), chunk.ModeGetRequest, c.addr) + if err != nil { + return err + } + } + } + log.Debug("done checking stores", "checked chunks", count, "total chunks", len(chunksProx)) + if count != len(chunksProx) { + return fmt.Errorf("checked chunks dont match numer of chunks. got %d want %d", count, len(chunksProx)) + } + + // check that chunks from each po are _not_ on nodes that don't have subscriptions for these POs + node := sim.Net.GetNode(nodeIDs[0]) + client, err := node.Client() + if err != nil { + return fmt.Errorf("create node 1 rpc client fail: %v", err) + } + + //ask it for subscriptions + pstreams := make(map[string][]string) + err = client.Call(&pstreams, "stream_getPeerServerSubscriptions") + if err != nil { + return fmt.Errorf("client call stream_getPeerSubscriptions: %v", err) + } + + //create a map of no-subs for a node + noSubMap := make(map[enode.ID]map[int]bool) + + for subscribedNode, streams := range pstreams { + id := enode.HexID(subscribedNode) + b := make([]bool, 17) + for _, sub := range streams { + subPO, err := ParseSyncBinKey(strings.Split(sub, "|")[1]) + if err != nil { + return err + } + b[int(subPO)] = true + } + noMapMap := make(map[int]bool) + for i, v := range b { + if !v { + noMapMap[i] = true + } + } + noSubMap[id] = noMapMap + } + + // iterate over noSubMap, for each node check if it has any of the chunks it shouldn't have + for nodeId, nodeNoSubs := range noSubMap { + for _, c := range chunksProx { + // if the chunk PO is equal to the sub that the node shouldnt have - check if the node has the chunk! + if _, ok := nodeNoSubs[c.uploaderNodePO]; ok { + count++ + item, ok = sim.NodeItem(nodeId, bucketKeyStore) + if !ok { + return fmt.Errorf("No DB") + } + store := item.(chunk.Store) + + _, err := store.Get(context.TODO(), chunk.ModeGetRequest, c.addr) + if err == nil { + return fmt.Errorf("got a chunk where it shouldn't be! addr %s, nodeId %s", c.addr, nodeId) + } + } + } + } + return nil + }) + + if result.Error != nil { + t.Fatal(result.Error) + } +} + +type chunkProxData struct { + addr chunk.Address + uploaderNodePO int + nodeProximities map[enode.ID]int + closestNode enode.ID + closestNodePO int +} + //TestSameVersionID just checks that if the version is not changed, //then streamer peers see each other func TestSameVersionID(t *testing.T) { @@ -343,5 +585,4 @@ func TestDifferentVersionID(t *testing.T) { t.Fatal(result.Error) } log.Info("Simulation ended") - } diff --git a/storage/filestore.go b/storage/filestore.go index 7e805a5762..2c2a625a18 100644 --- a/storage/filestore.go +++ b/storage/filestore.go @@ -61,12 +61,15 @@ func NewFileStoreParams() *FileStoreParams { } // for testing locally -func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, error) { +func NewLocalFileStore(datadir string, basekey []byte, tags *chunk.Tags) (*FileStore, func(), error) { localStore, err := localstore.New(datadir, basekey, nil) if err != nil { - return nil, err + return nil, nil, err } - return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), nil + cleanup := func() { + localStore.Close() + } + return NewFileStore(chunk.NewValidatorStore(localStore, NewContentAddressValidator(MakeHashFunc(DefaultHash))), NewFileStoreParams(), tags), cleanup, nil } func NewFileStore(store ChunkStore, params *FileStoreParams, tags *chunk.Tags) *FileStore {