Stream subscriptions (#18355)
* swarm/network: eachBin now starts at kaddepth for nn * swarm/network: fix Kademlia.EachBin * swarm/network: fix kademlia.EachBin * swarm/network: correct EachBin implementation according to requirements * swarm/network: less addresses simplified tests * swarm: calc kad depth outside loop in EachBin test * swarm/network: removed printResults * swarm/network: cleanup imports * swarm/network: remove kademlia.EachBin; fix RequestSubscriptions and add unit test * swarm/network/stream: address PR comments * swarm/network/stream: package-wide subscriptionFunc * swarm/network/stream: refactor to kad.EachConn
This commit is contained in:
@ -106,43 +106,6 @@ func TestSyncingViaGlobalSync(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncingViaDirectSubscribe(t *testing.T) {
|
||||
if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
|
||||
t.Skip("Flaky on mac on travis")
|
||||
}
|
||||
//if nodes/chunks have been provided via commandline,
|
||||
//run the tests with these values
|
||||
if *nodes != 0 && *chunks != 0 {
|
||||
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
|
||||
err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
} else {
|
||||
var nodeCnt []int
|
||||
var chnkCnt []int
|
||||
//if the `longrunning` flag has been provided
|
||||
//run more test combinations
|
||||
if *longrunning {
|
||||
chnkCnt = []int{1, 8, 32, 256, 1024}
|
||||
nodeCnt = []int{32, 16}
|
||||
} else {
|
||||
//default test
|
||||
chnkCnt = []int{4, 32}
|
||||
nodeCnt = []int{32, 16}
|
||||
}
|
||||
for _, chnk := range chnkCnt {
|
||||
for _, n := range nodeCnt {
|
||||
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
|
||||
err := testSyncingViaDirectSubscribe(t, chnk, n)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var simServiceMap = map[string]simulation.ServiceFunc{
|
||||
"streamer": streamerFunc,
|
||||
}
|
||||
@ -323,235 +286,6 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
The test generates the given number of chunks
|
||||
|
||||
For every chunk generated, the nearest node addresses
|
||||
are identified, we verify that the nodes closer to the
|
||||
chunk addresses actually do have the chunks in their local stores.
|
||||
|
||||
The test loads a snapshot file to construct the swarm network,
|
||||
assuming that the snapshot file identifies a healthy
|
||||
kademlia network. The snapshot should have 'streamer' in its service list.
|
||||
*/
|
||||
func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
|
||||
|
||||
sim := simulation.New(map[string]simulation.ServiceFunc{
|
||||
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
|
||||
n := ctx.Config.Node()
|
||||
addr := network.NewAddr(n)
|
||||
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
bucket.Store(bucketKeyStore, store)
|
||||
localStore := store.(*storage.LocalStore)
|
||||
netStore, err := storage.NewNetStore(localStore, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
|
||||
delivery := NewDelivery(kad, netStore)
|
||||
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
|
||||
|
||||
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
|
||||
Retrieval: RetrievalDisabled,
|
||||
Syncing: SyncingRegisterOnly,
|
||||
}, nil)
|
||||
bucket.Store(bucketKeyRegistry, r)
|
||||
|
||||
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
|
||||
bucket.Store(bucketKeyFileStore, fileStore)
|
||||
|
||||
cleanup = func() {
|
||||
os.RemoveAll(datadir)
|
||||
netStore.Close()
|
||||
r.Close()
|
||||
}
|
||||
|
||||
return r, cleanup, nil
|
||||
|
||||
},
|
||||
})
|
||||
defer sim.Close()
|
||||
|
||||
ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancelSimRun()
|
||||
|
||||
conf := &synctestConfig{}
|
||||
//map of discover ID to indexes of chunks expected at that ID
|
||||
conf.idToChunksMap = make(map[enode.ID][]int)
|
||||
//map of overlay address to discover ID
|
||||
conf.addrToIDMap = make(map[string]enode.ID)
|
||||
//array where the generated chunk hashes will be stored
|
||||
conf.hashes = make([]storage.Address, 0)
|
||||
|
||||
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := sim.WaitTillHealthy(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
disconnections := sim.PeerEvents(
|
||||
context.Background(),
|
||||
sim.NodeIDs(),
|
||||
simulation.NewPeerEventsFilter().Drop(),
|
||||
)
|
||||
|
||||
var disconnected atomic.Value
|
||||
go func() {
|
||||
for d := range disconnections {
|
||||
if d.Error != nil {
|
||||
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
|
||||
disconnected.Store(true)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
|
||||
nodeIDs := sim.UpNodeIDs()
|
||||
for _, n := range nodeIDs {
|
||||
//get the kademlia overlay address from this ID
|
||||
a := n.Bytes()
|
||||
//append it to the array of all overlay addresses
|
||||
conf.addrs = append(conf.addrs, a)
|
||||
//the proximity calculation is on overlay addr,
|
||||
//the p2p/simulations check func triggers on enode.ID,
|
||||
//so we need to know which overlay addr maps to which nodeID
|
||||
conf.addrToIDMap[string(a)] = n
|
||||
}
|
||||
|
||||
var subscriptionCount int
|
||||
|
||||
filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
|
||||
eventC := sim.PeerEvents(ctx, nodeIDs, filter)
|
||||
|
||||
for j, node := range nodeIDs {
|
||||
log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
|
||||
//start syncing!
|
||||
item, ok := sim.NodeItem(node, bucketKeyRegistry)
|
||||
if !ok {
|
||||
return fmt.Errorf("No registry")
|
||||
}
|
||||
registry := item.(*Registry)
|
||||
|
||||
var cnt int
|
||||
cnt, err = startSyncing(registry, conf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
//increment the number of subscriptions we need to wait for
|
||||
//by the count returned from startSyncing (SYNC subscriptions)
|
||||
subscriptionCount += cnt
|
||||
}
|
||||
|
||||
for e := range eventC {
|
||||
if e.Error != nil {
|
||||
return e.Error
|
||||
}
|
||||
subscriptionCount--
|
||||
if subscriptionCount == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
//select a random node for upload
|
||||
node := sim.Net.GetRandomUpNode()
|
||||
item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
|
||||
if !ok {
|
||||
return fmt.Errorf("No localstore")
|
||||
}
|
||||
lstore := item.(*storage.LocalStore)
|
||||
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conf.hashes = append(conf.hashes, hashes...)
|
||||
mapKeysToNodes(conf)
|
||||
|
||||
if _, err := sim.WaitTillHealthy(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var globalStore mock.GlobalStorer
|
||||
if *useMockStore {
|
||||
globalStore = mockmem.NewGlobalStore()
|
||||
}
|
||||
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
|
||||
// or until the timeout is reached.
|
||||
REPEAT:
|
||||
for {
|
||||
for _, id := range nodeIDs {
|
||||
//for each expected chunk, check if it is in the local store
|
||||
localChunks := conf.idToChunksMap[id]
|
||||
for _, ch := range localChunks {
|
||||
//get the real chunk by the index in the index array
|
||||
chunk := conf.hashes[ch]
|
||||
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
|
||||
//check if the expected chunk is indeed in the localstore
|
||||
var err error
|
||||
if *useMockStore {
|
||||
//use the globalStore if the mockStore should be used; in that case,
|
||||
//the complete localStore stack is bypassed for getting the chunk
|
||||
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
|
||||
} else {
|
||||
//use the actual localstore
|
||||
item, ok := sim.NodeItem(id, bucketKeyStore)
|
||||
if !ok {
|
||||
return fmt.Errorf("Error accessing localstore")
|
||||
}
|
||||
lstore := item.(*storage.LocalStore)
|
||||
_, err = lstore.Get(ctx, chunk)
|
||||
}
|
||||
if err != nil {
|
||||
log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
|
||||
// Do not get crazy with logging the warn message
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
continue REPEAT
|
||||
}
|
||||
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
if yes, ok := disconnected.Load().(bool); ok && yes {
|
||||
t.Fatal("disconnect events received")
|
||||
}
|
||||
log.Info("Simulation ended")
|
||||
return nil
|
||||
}
|
||||
|
||||
//the server func to start syncing
|
||||
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
|
||||
//the kademlia's `EachBin` function.
|
||||
//returns the number of subscriptions requested
|
||||
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
|
||||
var err error
|
||||
kad := r.delivery.kad
|
||||
subCnt := 0
|
||||
//iterate over each bin and solicit needed subscription to bins
|
||||
kad.EachBin(r.addr[:], pof, 0, func(conn *network.Peer, po int) bool {
|
||||
//identify begin and start index of the bin(s) we want to subscribe to
|
||||
subCnt++
|
||||
err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
})
|
||||
return subCnt, nil
|
||||
}
|
||||
|
||||
//map chunk keys to addresses which are responsible
|
||||
func mapKeysToNodes(conf *synctestConfig) {
|
||||
nodemap := make(map[string][]int)
|
||||
|
Reference in New Issue
Block a user