swarm-smoke: fix check max prox hosts for pull/push sync modes (#1578)

This commit is contained in:
Anton Evangelatov
2019-07-17 16:21:50 +02:00
committed by GitHub
parent ad77f43b9d
commit 48857ae88e
3 changed files with 112 additions and 26 deletions

View File

@ -37,22 +37,23 @@ var (
) )
var ( var (
allhosts string allhosts string
hosts []string hosts []string
filesize int filesize int
syncDelay bool syncDelay bool
inputSeed int pushsyncDelay bool
httpPort int syncMode string
wsPort int inputSeed int
verbosity int httpPort int
timeout int wsPort int
single bool verbosity int
onlyUpload bool timeout int
debug bool single bool
onlyUpload bool
debug bool
) )
func main() { func main() {
app := cli.NewApp() app := cli.NewApp()
app.Name = "smoke-test" app.Name = "smoke-test"
app.Usage = "" app.Usage = ""
@ -88,6 +89,17 @@ func main() {
Usage: "file size for generated random file in KB", Usage: "file size for generated random file in KB",
Destination: &filesize, Destination: &filesize,
}, },
cli.StringFlag{
Name: "sync-mode",
Value: "pullsync",
Usage: "sync mode - pushsync or pullsync or both",
Destination: &syncMode,
},
cli.BoolFlag{
Name: "pushsync-delay",
Usage: "wait for content to be push synced",
Destination: &pushsyncDelay,
},
cli.BoolFlag{ cli.BoolFlag{
Name: "sync-delay", Name: "sync-delay",
Usage: "wait for content to be synced", Usage: "wait for content to be synced",

View File

@ -35,6 +35,7 @@ import (
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/storage" "github.com/ethersphere/swarm/storage"
"github.com/ethersphere/swarm/testutil" "github.com/ethersphere/swarm/testutil"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
@ -232,7 +233,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
for i := range addrs { for i := range addrs {
var foundAt int var foundAt int
maxProx := -1 maxProx := -1
var maxProxHost string var maxProxHosts []string
for host := range allHostChunks { for host := range allHostChunks {
if allHostChunks[host][i] == '1' { if allHostChunks[host][i] == '1' {
foundAt++ foundAt++
@ -247,19 +248,43 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
prox := chunk.Proximity(addrs[i], ba) prox := chunk.Proximity(addrs[i], ba)
if prox > maxProx { if prox > maxProx {
maxProx = prox maxProx = prox
maxProxHost = host maxProxHosts = []string{host}
} else if prox == maxProx {
maxProxHosts = append(maxProxHosts, host)
} }
} }
if allHostChunks[maxProxHost][i] == '0' { log.Debug("sync mode", "sync mode", syncMode)
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
} else { if syncMode == "pullsync" || syncMode == "both" {
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) for _, maxProxHost := range maxProxHosts {
if allHostChunks[maxProxHost][i] == '0' {
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
} else {
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}
// if chunk found at less than 2 hosts, which is actually less that the min size of a NN
if foundAt < 2 {
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
}
} }
// if chunk found at less than 2 hosts if syncMode == "pushsync" {
if foundAt < 2 { var found bool
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) for _, maxProxHost := range maxProxHosts {
if allHostChunks[maxProxHost][i] == '1' {
found = true
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}
if !found {
for _, maxProxHost := range maxProxHosts {
log.Error("chunk not found at any max prox host", "ref", addrs[i], "hosts", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
}
}
} }
} }
} }
@ -284,7 +309,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed) log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "seed", seed)
t1 := time.Now() t1 := time.Now()
hash, err := upload(randomBytes, httpEndpoint(hosts[0])) tag := uuid.New()[:8]
hash, err := uploadWithTag(randomBytes, httpEndpoint(hosts[0]), tag)
if err != nil { if err != nil {
log.Error(err.Error()) log.Error(err.Error())
return err return err
@ -300,6 +326,11 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash)) log.Info("uploaded successfully", "hash", hash, "took", t2, "digest", fmt.Sprintf("%x", fhash))
// wait to push sync sync
if pushsyncDelay {
waitToPushSynced(tag)
}
// wait to sync and log chunks before fetch attempt, only if syncDelay is set to true // wait to sync and log chunks before fetch attempt, only if syncDelay is set to true
if syncDelay { if syncDelay {
waitToSync() waitToSync()
@ -338,6 +369,29 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
return nil return nil
} }
func isPushSynced(wsHost string, tagname string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil {
defer rpcClient.Close()
}
if err != nil {
log.Error("error dialing host", "err", err)
return false, err
}
var isSynced bool
err = rpcClient.Call(&isSynced, "bzz_isPushSynced", tagname)
if err != nil {
log.Error("error calling host for isPushSynced", "err", err)
return false, err
}
log.Debug("isSynced result", "host", wsHost, "isSynced", isSynced)
return isSynced, nil
}
func isSyncing(wsHost string) (bool, error) { func isSyncing(wsHost string) (bool, error) {
rpcClient, err := rpc.Dial(wsHost) rpcClient, err := rpc.Dial(wsHost)
if rpcClient != nil { if rpcClient != nil {
@ -361,6 +415,20 @@ func isSyncing(wsHost string) (bool, error) {
return isSyncing, nil return isSyncing, nil
} }
func waitToPushSynced(tagname string) {
for {
synced, err := isPushSynced(wsEndpoint(hosts[0]), tagname)
if err != nil {
log.Error(err.Error())
}
if synced {
return
}
time.Sleep(200 * time.Millisecond)
}
}
func waitToSync() { func waitToSync() {
t1 := time.Now() t1 := time.Now()

View File

@ -38,6 +38,7 @@ import (
"github.com/ethersphere/swarm/api/client" "github.com/ethersphere/swarm/api/client"
"github.com/ethersphere/swarm/spancontext" "github.com/ethersphere/swarm/spancontext"
opentracing "github.com/opentracing/opentracing-go" opentracing "github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
cli "gopkg.in/urfave/cli.v1" cli "gopkg.in/urfave/cli.v1"
) )
@ -193,8 +194,13 @@ func fetch(hash string, endpoint string, original []byte, ruid string) error {
return nil return nil
} }
// upload an arbitrary byte as a plaintext file to `endpoint` using the api client // upload an arbitrary byte as a plaintext file to `endpoint` using the api client
func upload(data []byte, endpoint string) (string, error) { func upload(data []byte, endpoint string) (string, error) {
return uploadWithTag(data, endpoint, uuid.New()[:8])
}
// uploadWithTag an arbitrary byte as a plaintext file to `endpoint` using the api client with a given tag
func uploadWithTag(data []byte, endpoint string, tag string) (string, error) {
swarm := client.NewClient(endpoint) swarm := client.NewClient(endpoint)
f := &client.File{ f := &client.File{
ReadCloser: ioutil.NopCloser(bytes.NewReader(data)), ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
@ -203,10 +209,10 @@ func upload(data []byte, endpoint string) (string, error) {
Mode: 0660, Mode: 0660,
Size: int64(len(data)), Size: int64(len(data)),
}, },
Tag: tag,
} }
// upload data to bzz:// and retrieve the content-addressed manifest hash, hex-encoded. return swarm.TarUpload("", &client.FileUploader{f}, "", false)
return swarm.Upload(f, "", false)
} }
func digest(r io.Reader) ([]byte, error) { func digest(r io.Reader) ([]byte, error) {