From 48857ae88efbf449edc6b9e3e270b8a79911c6a5 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Wed, 17 Jul 2019 16:21:50 +0200 Subject: [PATCH] swarm-smoke: fix check max prox hosts for pull/push sync modes (#1578) --- cmd/swarm-smoke/main.go | 38 ++++++++----- cmd/swarm-smoke/upload_and_sync.go | 88 ++++++++++++++++++++++++++---- cmd/swarm-smoke/util.go | 12 +++- 3 files changed, 112 insertions(+), 26 deletions(-) diff --git a/cmd/swarm-smoke/main.go b/cmd/swarm-smoke/main.go index 415eebe122..91e4afc634 100644 --- a/cmd/swarm-smoke/main.go +++ b/cmd/swarm-smoke/main.go @@ -37,22 +37,23 @@ var ( ) var ( - allhosts string - hosts []string - filesize int - syncDelay bool - inputSeed int - httpPort int - wsPort int - verbosity int - timeout int - single bool - onlyUpload bool - debug bool + allhosts string + hosts []string + filesize int + syncDelay bool + pushsyncDelay bool + syncMode string + inputSeed int + httpPort int + wsPort int + verbosity int + timeout int + single bool + onlyUpload bool + debug bool ) func main() { - app := cli.NewApp() app.Name = "smoke-test" app.Usage = "" @@ -88,6 +89,17 @@ func main() { Usage: "file size for generated random file in KB", Destination: &filesize, }, + cli.StringFlag{ + Name: "sync-mode", + Value: "pullsync", + Usage: "sync mode - pushsync or pullsync or both", + Destination: &syncMode, + }, + cli.BoolFlag{ + Name: "pushsync-delay", + Usage: "wait for content to be push synced", + Destination: &pushsyncDelay, + }, cli.BoolFlag{ Name: "sync-delay", Usage: "wait for content to be synced", diff --git a/cmd/swarm-smoke/upload_and_sync.go b/cmd/swarm-smoke/upload_and_sync.go index 7accef1c01..c3ae7d1c86 100644 --- a/cmd/swarm-smoke/upload_and_sync.go +++ b/cmd/swarm-smoke/upload_and_sync.go @@ -35,6 +35,7 @@ import ( "github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/testutil" + "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -232,7 +233,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin for i := range addrs { var foundAt int maxProx := -1 - var maxProxHost string + var maxProxHosts []string for host := range allHostChunks { if allHostChunks[host][i] == '1' { foundAt++ @@ -247,19 +248,43 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin prox := chunk.Proximity(addrs[i], ba) if prox > maxProx { maxProx = prox - maxProxHost = host + maxProxHosts = []string{host} + } else if prox == maxProx { + maxProxHosts = append(maxProxHosts, host) } } - if allHostChunks[maxProxHost][i] == '0' { - log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) - } else { - log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + log.Debug("sync mode", "sync mode", syncMode) + + if syncMode == "pullsync" || syncMode == "both" { + for _, maxProxHost := range maxProxHosts { + if allHostChunks[maxProxHost][i] == '0' { + log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } else { + log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } + + // if chunk found at less than 2 hosts, which is actually less that the min size of a NN + if foundAt < 2 { + log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) + } } - // if chunk found at less than 2 hosts - if foundAt < 2 { - log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) + if syncMode == "pushsync" { + var found bool + for _, maxProxHost := range maxProxHosts { + if allHostChunks[maxProxHost][i] == '1' { + found = true + log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } + + if !found { + for _, maxProxHost := range maxProxHosts { + log.Error("chunk not found at any max prox host", "ref", addrs[i], "hosts", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) + } + } } } } @@ -284,7 +309,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) t1 := time.Now() - hash, err := upload(randomBytes, httpEndpoint(hosts[0])) + tag := uuid.New()[:8] + hash, err := uploadWithTag(randomBytes, httpEndpoint(hosts[0]), tag) if err != nil { log.Error(err.Error()) return err @@ -300,6 +326,11 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) + // wait to push sync sync + if pushsyncDelay { + waitToPushSynced(tag) + } + // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true if syncDelay { waitToSync() @@ -338,6 +369,29 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error { return nil } +func isPushSynced(wsHost string, tagname string) (bool, error) { + rpcClient, err := rpc.Dial(wsHost) + if rpcClient != nil { + defer rpcClient.Close() + } + + if err != nil { + log.Error("error dialing host", "err", err) + return false, err + } + + var isSynced bool + err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname) + if err != nil { + log.Error("error calling host for isPushSynced", "err", err) + return false, err + } + + log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced) + + return isSynced, nil +} + func isSyncing(wsHost string) (bool, error) { rpcClient, err := rpc.Dial(wsHost) if rpcClient != nil { @@ -361,6 +415,20 @@ func isSyncing(wsHost string) (bool, error) { return isSyncing, nil } +func waitToPushSynced(tagname string) { + for { + synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname) + if err != nil { + log.Error(err.Error()) + } + + if synced { + return + } + time.Sleep(200 * time.Millisecond) + } +} + func waitToSync() { t1 := time.Now() diff --git a/cmd/swarm-smoke/util.go b/cmd/swarm-smoke/util.go index f838fa07e5..99f4aa5a4e 100644 --- a/cmd/swarm-smoke/util.go +++ b/cmd/swarm-smoke/util.go @@ -38,6 +38,7 @@ import ( "github.com/ethersphere/swarm/api/client" "github.com/ethersphere/swarm/spancontext" opentracing "github.com/opentracing/opentracing-go" + "github.com/pborman/uuid" cli "gopkg.in/urfave/cli.v1" ) @@ -193,8 +194,13 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error { return nil } -// upload an arbitrary byte as a plaintext file to `endpoint` using the api client +// upload an arbitrary byte as a plaintext file to `endpoint` using the api client func upload(data []byte, endpoint string) (string, error) { + return uploadWithTag(data, endpoint, uuid.New()[:8]) +} + +// uploadWithTag an arbitrary byte as a plaintext file to `endpoint` using the api client with a given tag +func uploadWithTag(data []byte, endpoint string, tag string) (string, error) { swarm := client.NewClient(endpoint) f := &client.File{ ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), @@ -203,10 +209,10 @@ func upload(data []byte, endpoint string) (string, error) { Mode: 0660, Size: int64(len(data)), }, + Tag: tag, } - // upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. - return swarm.Upload(f, "", false) + return swarm.TarUpload("", &client.FileUploader{f}, "", false) } func digest(r io.Reader) ([]byte, error) {