cmd/swarm/swarm-smoke: Trigger chunk debug on timeout (#19101)
* cmd/swarm/swarm-smoke: first version trigger has-chunks on timeout * cmd/swarm/swarm-smoke: finalize trigger to chunk debug * cmd/swarm/swarm-smoke: fixed httpEndpoint for trigger * cmd/swarm/swarm-smoke: port * cmd/swarm/swarm-smoke: ws not rpc * cmd/swarm/swarm-smoke: added debug output * cmd/swarm/swarm-smoke: addressed PR comments * cmd/swarm/swarm-smoke: renamed track-timeout and track-chunks
This commit is contained in:
		@@ -37,15 +37,16 @@ var (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	allhosts  string
 | 
			
		||||
	hosts     []string
 | 
			
		||||
	filesize  int
 | 
			
		||||
	syncDelay int
 | 
			
		||||
	httpPort  int
 | 
			
		||||
	wsPort    int
 | 
			
		||||
	verbosity int
 | 
			
		||||
	timeout   int
 | 
			
		||||
	single    bool
 | 
			
		||||
	allhosts     string
 | 
			
		||||
	hosts        []string
 | 
			
		||||
	filesize     int
 | 
			
		||||
	syncDelay    int
 | 
			
		||||
	httpPort     int
 | 
			
		||||
	wsPort       int
 | 
			
		||||
	verbosity    int
 | 
			
		||||
	timeout      int
 | 
			
		||||
	single       bool
 | 
			
		||||
	trackTimeout int
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
@@ -102,6 +103,12 @@ func main() {
 | 
			
		||||
			Usage:       "whether to fetch content from a single node or from all nodes",
 | 
			
		||||
			Destination: &single,
 | 
			
		||||
		},
 | 
			
		||||
		cli.IntFlag{
 | 
			
		||||
			Name:        "track-timeout",
 | 
			
		||||
			Value:       5,
 | 
			
		||||
			Usage:       "timeout in seconds to wait for GetAllReferences to return",
 | 
			
		||||
			Destination: &trackTimeout,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	app.Flags = append(app.Flags, []cli.Flag{
 | 
			
		||||
 
 | 
			
		||||
@@ -18,13 +18,19 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"os"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/ethereum/go-ethereum/log"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/metrics"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/rpc"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/api"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
			
		||||
	"github.com/ethereum/go-ethereum/swarm/testutil"
 | 
			
		||||
	"github.com/pborman/uuid"
 | 
			
		||||
 | 
			
		||||
@@ -49,12 +55,75 @@ func uploadAndSyncCmd(ctx *cli.Context, tuid string) error {
 | 
			
		||||
	case <-time.After(time.Duration(timeout) * time.Second):
 | 
			
		||||
		metrics.GetOrRegisterCounter(fmt.Sprintf("%s.timeout", commandName), nil).Inc(1)
 | 
			
		||||
 | 
			
		||||
		e := fmt.Errorf("timeout after %v sec", timeout)
 | 
			
		||||
		// trigger debug functionality on randomBytes
 | 
			
		||||
		err := trackChunks(randomBytes[:])
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			e = fmt.Errorf("%v; triggerChunkDebug failed: %v", e, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return fmt.Errorf("timeout after %v sec", timeout)
 | 
			
		||||
		return e
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func trackChunks(testData []byte) error {
 | 
			
		||||
	log.Warn("Test timed out; running chunk debug sequence")
 | 
			
		||||
 | 
			
		||||
	addrs, err := getAllRefs(testData)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	log.Trace("All references retrieved")
 | 
			
		||||
 | 
			
		||||
	// has-chunks
 | 
			
		||||
	for _, host := range hosts {
 | 
			
		||||
		httpHost := fmt.Sprintf("ws://%s:%d", host, 8546)
 | 
			
		||||
		log.Trace("Calling `Has` on host", "httpHost", httpHost)
 | 
			
		||||
		rpcClient, err := rpc.Dial(httpHost)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Trace("Error dialing host", "err", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		log.Trace("rpc dial ok")
 | 
			
		||||
		var hasInfo []api.HasInfo
 | 
			
		||||
		err = rpcClient.Call(&hasInfo, "bzz_has", addrs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			log.Trace("Error calling host", "err", err)
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		log.Trace("rpc call ok")
 | 
			
		||||
		count := 0
 | 
			
		||||
		for _, info := range hasInfo {
 | 
			
		||||
			if !info.Has {
 | 
			
		||||
				count++
 | 
			
		||||
				log.Error("Host does not have chunk", "host", httpHost, "chunk", info.Addr)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if count == 0 {
 | 
			
		||||
			log.Info("Host reported to have all chunks", "host", httpHost)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getAllRefs(testData []byte) (storage.AddressCollection, error) {
 | 
			
		||||
	log.Trace("Getting all references for given root hash")
 | 
			
		||||
	datadir, err := ioutil.TempDir("", "chunk-debug")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("unable to create temp dir: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	defer os.RemoveAll(datadir)
 | 
			
		||||
	fileStore, err := storage.NewLocalFileStore(datadir, make([]byte, 32))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(trackTimeout)*time.Second)
 | 
			
		||||
	defer cancel()
 | 
			
		||||
 | 
			
		||||
	reader := bytes.NewReader(testData)
 | 
			
		||||
	return fileStore.GetAllReferences(ctx, reader, false)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func uplaodAndSync(c *cli.Context, randomBytes []byte, tuid string) error {
 | 
			
		||||
	log.Info("uploading to "+httpEndpoint(hosts[0])+" and syncing", "tuid", tuid, "seed", seed)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user