diff --git a/api/inspector.go b/api/inspector.go index 71ec3c3a19..ab80568995 100644 --- a/api/inspector.go +++ b/api/inspector.go @@ -51,7 +51,7 @@ func (inspector *Inspector) ListKnown() []string { return res } -func (inspector *Inspector) IsSyncing() bool { +func (inspector *Inspector) IsPullSyncing() bool { lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil) // last received chunks msg time diff --git a/client/bzz.go b/client/bzz.go new file mode 100644 index 0000000000..0501f844e9 --- /dev/null +++ b/client/bzz.go @@ -0,0 +1,81 @@ +package client + +import ( + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethersphere/swarm" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/storage" +) + +type Bzz struct { + client *rpc.Client +} + +// NewBzz is a constructor for a Bzz API +func NewBzz(client *rpc.Client) *Bzz { + return &Bzz{ + client: client, + } +} + +// GetChunksBitVector returns a bit vector of presence for a given slice of chunks +func (b *Bzz) GetChunksBitVector(addrs []storage.Address) (string, error) { + var hostChunks string + const trackChunksPageSize = 7500 + + for len(addrs) > 0 { + var pageChunks string + // get current page size, so that we avoid a slice out of bounds on the last page + pagesize := trackChunksPageSize + if len(addrs) < trackChunksPageSize { + pagesize = len(addrs) + } + + err := b.client.Call(&pageChunks, "bzz_has", addrs[:pagesize]) + if err != nil { + return "", err + } + hostChunks += pageChunks + addrs = addrs[pagesize:] + } + + return hostChunks, nil +} + +// GetBzzAddr returns the bzzAddr of the node +func (b *Bzz) GetBzzAddr() (string, error) { + var info swarm.Info + + err := b.client.Call(&info, "bzz_info") + if err != nil { + return "", err + } + + return info.BzzKey[2:], nil +} + +// IsPullSyncing is checking if the node is still receiving chunk deliveries due to pull syncing +func (b *Bzz) IsPullSyncing() (bool, error) { + var isSyncing bool + + err := b.client.Call(&isSyncing, "bzz_isPullSyncing") + if err != nil { + log.Error("error calling host for isPullSyncing", "err", err) + return false, err + } + + return isSyncing, nil +} + +// IsPushSynced checks if the given `tag` is done syncing, i.e. we've received receipts for all chunks +func (b *Bzz) IsPushSynced(tagname string) (bool, error) { + var isSynced bool + + err := b.client.Call(&isSynced, "bzz_isPushSynced", tagname) + if err != nil { + log.Error("error calling host for isPushSynced", "err", err) + return false, err + } + + return isSynced, nil +} diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index c3ae7d1c86..818ebf53c2 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -24,7 +24,6 @@ import ( "io/ioutil" "math/rand" "os" - "strings" "sync" "sync/atomic" "time" @@ -33,9 +32,11 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc" "github.com/ethersphere/swarm/chunk" + "github.com/ethersphere/swarm/client" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/testutil" "github.com/pborman/uuid" + "golang.org/x/sync/errgroup" cli "gopkg.in/urfave/cli.v1" ) @@ -117,14 +118,16 @@ func trackChunks(testData []byte, submitMetrics bool) error { return } - hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs) + bzzClient := client.NewBzz(rpcClient) + + hostChunks, err := bzzClient.GetChunksBitVector(addrs) if err != nil { log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost) hasErr = true return } - bzzAddr, err := getBzzAddrFromHost(rpcClient) + bzzAddr, err := bzzClient.GetBzzAddr() if err != nil { log.Error("error getting bzz addrs from host", "err", err, "host", httpHost) hasErr = true @@ -176,46 +179,6 @@ func trackChunks(testData []byte, submitMetrics bool) error { return nil } -// getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host -func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) { - var hostChunks string - const trackChunksPageSize = 7500 - - for len(addrs) > 0 { - var pageChunks string - // get current page size, so that we avoid a slice out of bounds on the last page - pagesize := trackChunksPageSize - if len(addrs) < trackChunksPageSize { - pagesize = len(addrs) - } - - err := client.Call(&pageChunks, "bzz_has", addrs[:pagesize]) - if err != nil { - return "", err - } - hostChunks += pageChunks - addrs = addrs[pagesize:] - } - - return hostChunks, nil -} - -// getBzzAddrFromHost returns the bzzAddr for a given host -func getBzzAddrFromHost(client *rpc.Client) (string, error) { - var hive string - - err := client.Call(&hive, "bzz_hive") - if err != nil { - return "", err - } - - // we make an ugly assumption about the output format of the hive.String() method - // ideally we should replace this with an API call that returns the bzz addr for a given host, - // but this also works for now (provided we don't change the hive.String() method, which we haven't in some time - ss := strings.Split(strings.Split(hive, "\n")[3], " ") - return ss[len(ss)-1], nil -} - // checkChunksVsMostProxHosts is checking: // 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen. // 2. if a chunk is not found at its closest node. This should also not happen. @@ -369,63 +332,30 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { return nil } -func isPushSynced(wsHost string, tagname string) (bool, error) { - rpcClient, err := rpc.Dial(wsHost) - if rpcClient != nil { - defer rpcClient.Close() - } - - if err != nil { - log.Error("error dialing host", "err", err) - return false, err - } - - var isSynced bool - err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname) - if err != nil { - log.Error("error calling host for isPushSynced", "err", err) - return false, err - } - - log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced) - - return isSynced, nil -} - -func isSyncing(wsHost string) (bool, error) { - rpcClient, err := rpc.Dial(wsHost) - if rpcClient != nil { - defer rpcClient.Close() - } - - if err != nil { - log.Error("error dialing host", "err", err) - return false, err - } - - var isSyncing bool - err = rpcClient.Call(&isSyncing, "bzz_isSyncing") - if err != nil { - log.Error("error calling host for isSyncing", "err", err) - return false, err - } - - log.Debug("isSyncing result", "host", wsHost, "isSyncing", isSyncing) - - return isSyncing, nil -} - func waitToPushSynced(tagname string) { for { - synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname) + time.Sleep(200 * time.Millisecond) + + rpcClient, err := rpc.Dial(wsEndpoint(hosts[0])) + if rpcClient != nil { + defer rpcClient.Close() + } + if err != nil { + log.Error("error dialing host", "err", err) + continue + } + + bzzClient := client.NewBzz(rpcClient) + + synced, err := bzzClient.IsPushSynced(tagname) if err != nil { log.Error(err.Error()) + continue } if synced { return } - time.Sleep(200 * time.Millisecond) } } @@ -438,22 +368,39 @@ func waitToSync() { time.Sleep(3 * time.Second) notSynced := uint64(0) - var wg sync.WaitGroup - wg.Add(len(hosts)) + + var g errgroup.Group for i := 0; i < len(hosts); i++ { i := i - go func(idx int) { - stillSyncing, err := isSyncing(wsEndpoint(hosts[idx])) + g.Go(func() error { + rpcClient, err := rpc.Dial(wsEndpoint(hosts[i])) + if rpcClient != nil { + defer rpcClient.Close() + } + if err != nil { + log.Error("error dialing host", "err", err) + return err + } - if stillSyncing || err != nil { + bzzClient := client.NewBzz(rpcClient) + + stillSyncing, err := bzzClient.IsPullSyncing() + if err != nil { + return err + } + + if stillSyncing { atomic.AddUint64(¬Synced, 1) } - wg.Done() - }(i) - } - wg.Wait() - ns = atomic.LoadUint64(¬Synced) + return nil + }) + } + + // Wait for all RPC calls to complete. + if err := g.Wait(); err == nil { + ns = atomic.LoadUint64(¬Synced) + } } t2 := time.Since(t1) diff --git a/vendor/golang.org/x/sync/errgroup/errgroup.go b/vendor/golang.org/x/sync/errgroup/errgroup.go new file mode 100644 index 0000000000..9857fe53d3 --- /dev/null +++ b/vendor/golang.org/x/sync/errgroup/errgroup.go @@ -0,0 +1,66 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package errgroup provides synchronization, error propagation, and Context +// cancelation for groups of goroutines working on subtasks of a common task. +package errgroup + +import ( + "context" + "sync" +) + +// A Group is a collection of goroutines working on subtasks that are part of +// the same overall task. +// +// A zero Group is valid and does not cancel on error. +type Group struct { + cancel func() + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all function calls from the Go method have returned, then +// returns the first non-nil error (if any) from them. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the given function in a new goroutine. +// +// The first call to return a non-nil error cancels the group; its error will be +// returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + + if err := f(); err != nil { + g.errOnce.Do(func() { + g.err = err + if g.cancel != nil { + g.cancel() + } + }) + } + }() +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 2dcfbba3fd..4df41392a4 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1240,6 +1240,12 @@ "revision": "eb5bcb51f2a31c7d5141d810b70815c05d9c9146", "revisionTime": "2019-04-03T01:06:53Z" }, + { + "checksumSHA1": "iEK5hCRfrkdc1JOJsaiWuymHmeQ=", + "path": "golang.org/x/sync/errgroup", + "revision": "112230192c580c3556b8cee6403af37a4fc5f28c", + "revisionTime": "2019-04-22T22:11:18Z" + }, { "checksumSHA1": "FuQoDr6zh5GsiVVyo3oDZcUVC3c=", "path": "golang.org/x/sync/singleflight",