all: blidly swap out glog to our log15, logs need rework
This commit is contained in:
@ -25,8 +25,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -72,9 +71,9 @@ type ErrResolve error
|
||||
|
||||
// DNS Resolver
|
||||
func (self *Api) Resolve(hostPort string, nameresolver bool) (storage.Key, error) {
|
||||
glog.V(logger.Detail).Infof("Resolving : %v", hostPort)
|
||||
log.Trace(fmt.Sprintf("Resolving : %v", hostPort))
|
||||
if hashMatcher.MatchString(hostPort) || self.dns == nil {
|
||||
glog.V(logger.Detail).Infof("host is a contentHash: '%v'", hostPort)
|
||||
log.Trace(fmt.Sprintf("host is a contentHash: '%v'", hostPort))
|
||||
return storage.Key(common.Hex2Bytes(hostPort)), nil
|
||||
}
|
||||
if !nameresolver {
|
||||
@ -83,9 +82,9 @@ func (self *Api) Resolve(hostPort string, nameresolver bool) (storage.Key, error
|
||||
contentHash, err := self.dns.Resolve(hostPort)
|
||||
if err != nil {
|
||||
err = ErrResolve(err)
|
||||
glog.V(logger.Warn).Infof("DNS error : %v", err)
|
||||
log.Warn(fmt.Sprintf("DNS error : %v", err))
|
||||
}
|
||||
glog.V(logger.Detail).Infof("host lookup: %v -> %v", err)
|
||||
log.Trace(fmt.Sprintf("host lookup: %v -> %v", err))
|
||||
return contentHash[:], err
|
||||
}
|
||||
func Parse(uri string) (hostPort, path string) {
|
||||
@ -110,7 +109,7 @@ func Parse(uri string) (hostPort, path string) {
|
||||
path = parts[i]
|
||||
}
|
||||
}
|
||||
glog.V(logger.Debug).Infof("host: '%s', path '%s' requested.", hostPort, path)
|
||||
log.Debug(fmt.Sprintf("host: '%s', path '%s' requested.", hostPort, path))
|
||||
return
|
||||
}
|
||||
|
||||
@ -118,7 +117,7 @@ func (self *Api) parseAndResolve(uri string, nameresolver bool) (key storage.Key
|
||||
hostPort, path = Parse(uri)
|
||||
//resolving host and port
|
||||
contentHash, err := self.Resolve(hostPort, nameresolver)
|
||||
glog.V(logger.Debug).Infof("Resolved '%s' to contentHash: '%s', path: '%s'", uri, contentHash, path)
|
||||
log.Debug(fmt.Sprintf("Resolved '%s' to contentHash: '%s', path: '%s'", uri, contentHash, path))
|
||||
return contentHash[:], hostPort, path, err
|
||||
}
|
||||
|
||||
@ -152,11 +151,11 @@ func (self *Api) Get(uri string, nameresolver bool) (reader storage.LazySectionR
|
||||
quitC := make(chan bool)
|
||||
trie, err := loadManifest(self.dpa, key, quitC)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("loadManifestTrie error: %v", err)
|
||||
log.Warn(fmt.Sprintf("loadManifestTrie error: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infof("getEntry(%s)", path)
|
||||
log.Trace(fmt.Sprintf("getEntry(%s)", path))
|
||||
|
||||
entry, _ := trie.getEntry(path)
|
||||
|
||||
@ -164,12 +163,12 @@ func (self *Api) Get(uri string, nameresolver bool) (reader storage.LazySectionR
|
||||
key = common.Hex2Bytes(entry.Hash)
|
||||
status = entry.Status
|
||||
mimeType = entry.ContentType
|
||||
glog.V(logger.Detail).Infof("content lookup key: '%v' (%v)", key, mimeType)
|
||||
log.Trace(fmt.Sprintf("content lookup key: '%v' (%v)", key, mimeType))
|
||||
reader = self.dpa.Retrieve(key)
|
||||
} else {
|
||||
status = http.StatusNotFound
|
||||
err = fmt.Errorf("manifest entry for '%s' not found", path)
|
||||
glog.V(logger.Warn).Infof("%v", err)
|
||||
log.Warn(fmt.Sprintf("%v", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -17,13 +17,13 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -76,7 +76,7 @@ func checkResponse(t *testing.T, resp *testResponse, exp *Response) {
|
||||
|
||||
// func expResponse(content []byte, mimeType string, status int) *Response {
|
||||
func expResponse(content string, mimeType string, status int) *Response {
|
||||
glog.V(logger.Detail).Infof("expected content (%v): %v ", len(content), content)
|
||||
log.Trace(fmt.Sprintf("expected content (%v): %v ", len(content), content))
|
||||
return &Response{mimeType, status, int64(len(content)), content}
|
||||
}
|
||||
|
||||
@ -91,7 +91,7 @@ func testGet(t *testing.T, api *Api, bzzhash string) *testResponse {
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
glog.V(logger.Detail).Infof("reader size: %v ", size)
|
||||
log.Trace(fmt.Sprintf("reader size: %v ", size))
|
||||
s := make([]byte, size)
|
||||
_, err = reader.Read(s)
|
||||
if err != io.EOF {
|
||||
|
@ -26,8 +26,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -63,7 +62,7 @@ func (self *FileSystem) Upload(lpath, index string) (string, error) {
|
||||
var start int
|
||||
if stat.IsDir() {
|
||||
start = len(localpath)
|
||||
glog.V(logger.Debug).Infof("uploading '%s'", localpath)
|
||||
log.Debug(fmt.Sprintf("uploading '%s'", localpath))
|
||||
err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
|
||||
if (err == nil) && !info.IsDir() {
|
||||
//fmt.Printf("lp %s path %s\n", localpath, path)
|
||||
@ -198,7 +197,7 @@ func (self *FileSystem) Download(bzzpath, localpath string) error {
|
||||
quitC := make(chan bool)
|
||||
trie, err := loadManifest(self.api.dpa, key, quitC)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("fs.Download: loadManifestTrie error: %v", err)
|
||||
log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
|
||||
return err
|
||||
}
|
||||
|
||||
@ -212,7 +211,7 @@ func (self *FileSystem) Download(bzzpath, localpath string) error {
|
||||
|
||||
prevPath := lpath
|
||||
err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
|
||||
glog.V(logger.Detail).Infof("fs.Download: %#v", entry)
|
||||
log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
|
||||
|
||||
key = common.Hex2Bytes(entry.Hash)
|
||||
path := lpath + "/" + suffix
|
||||
|
@ -20,8 +20,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -58,7 +57,7 @@ func (self *RoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err
|
||||
host = "localhost"
|
||||
}
|
||||
url := fmt.Sprintf("http://%s:%s/%s:/%s/%s", host, self.Port, req.Proto, req.URL.Host, req.URL.Path)
|
||||
glog.V(logger.Info).Infof("roundtripper: proxying request '%s' to '%s'", req.RequestURI, url)
|
||||
log.Info(fmt.Sprintf("roundtripper: proxying request '%s' to '%s'", req.RequestURI, url))
|
||||
reqProxy, err := http.NewRequest(req.Method, url, req.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -21,6 +21,7 @@ package http
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"regexp"
|
||||
@ -29,8 +30,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/api"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
"github.com/rs/cors"
|
||||
@ -86,7 +86,7 @@ func StartHttpServer(api *api.Api, server *Server) {
|
||||
hdlr := c.Handler(serveMux)
|
||||
|
||||
go http.ListenAndServe(server.Addr, hdlr)
|
||||
glog.V(logger.Info).Infof("Swarm HTTP proxy started on localhost:%s", server.Addr)
|
||||
log.Info(fmt.Sprintf("Swarm HTTP proxy started on localhost:%s", server.Addr))
|
||||
}
|
||||
|
||||
func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
@ -100,13 +100,13 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
glog.V(logger.Debug).Infof("HTTP %s request URL: '%s', Host: '%s', Path: '%s', Referer: '%s', Accept: '%s'", r.Method, r.RequestURI, requestURL.Host, requestURL.Path, r.Referer(), r.Header.Get("Accept"))
|
||||
log.Debug(fmt.Sprintf("HTTP %s request URL: '%s', Host: '%s', Path: '%s', Referer: '%s', Accept: '%s'", r.Method, r.RequestURI, requestURL.Host, requestURL.Path, r.Referer(), r.Header.Get("Accept")))
|
||||
uri := requestURL.Path
|
||||
var raw, nameresolver bool
|
||||
var proto string
|
||||
|
||||
// HTTP-based URL protocol handler
|
||||
glog.V(logger.Debug).Infof("BZZ request URI: '%s'", uri)
|
||||
log.Debug(fmt.Sprintf("BZZ request URI: '%s'", uri))
|
||||
|
||||
path := bzzPrefix.ReplaceAllStringFunc(uri, func(p string) string {
|
||||
proto = p
|
||||
@ -115,24 +115,18 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
|
||||
// protocol identification (ugly)
|
||||
if proto == "" {
|
||||
if glog.V(logger.Error) {
|
||||
glog.Errorf(
|
||||
"[BZZ] Swarm: Protocol error in request `%s`.",
|
||||
uri,
|
||||
)
|
||||
http.Error(w, "Invalid request URL: need access protocol (bzz:/, bzzr:/, bzzi:/) as first element in path.", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
log.Error(fmt.Sprintf("[BZZ] Swarm: Protocol error in request `%s`.", uri))
|
||||
http.Error(w, "Invalid request URL: need access protocol (bzz:/, bzzr:/, bzzi:/) as first element in path.", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if len(proto) > 4 {
|
||||
raw = proto[1:5] == "bzzr"
|
||||
nameresolver = proto[1:5] != "bzzi"
|
||||
}
|
||||
|
||||
glog.V(logger.Debug).Infof(
|
||||
"[BZZ] Swarm: %s request over protocol %s '%s' received.",
|
||||
r.Method, proto, path,
|
||||
)
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("[BZZ] Swarm: %s request over protocol %s '%s' received.", r.Method, proto, path)
|
||||
}})
|
||||
|
||||
switch {
|
||||
case r.Method == "POST" || r.Method == "PUT":
|
||||
@ -142,7 +136,7 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
}
|
||||
key, err := a.Store(io.LimitReader(r.Body, r.ContentLength), r.ContentLength, nil)
|
||||
if err == nil {
|
||||
glog.V(logger.Debug).Infof("Content for %v stored", key.Log())
|
||||
log.Debug(fmt.Sprintf("Content for %v stored", key.Log()))
|
||||
} else {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
@ -164,10 +158,10 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
path = api.RegularSlashes(path)
|
||||
mime := r.Header.Get("Content-Type")
|
||||
// TODO proper root hash separation
|
||||
glog.V(logger.Debug).Infof("Modify '%s' to store %v as '%s'.", path, key.Log(), mime)
|
||||
log.Debug(fmt.Sprintf("Modify '%s' to store %v as '%s'.", path, key.Log(), mime))
|
||||
newKey, err := a.Modify(path, common.Bytes2Hex(key), mime, nameresolver)
|
||||
if err == nil {
|
||||
glog.V(logger.Debug).Infof("Swarm replaced manifest by '%s'", newKey)
|
||||
log.Debug(fmt.Sprintf("Swarm replaced manifest by '%s'", newKey))
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
http.ServeContent(w, r, "", time.Now(), bytes.NewReader([]byte(newKey)))
|
||||
} else {
|
||||
@ -182,10 +176,10 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
return
|
||||
} else {
|
||||
path = api.RegularSlashes(path)
|
||||
glog.V(logger.Debug).Infof("Delete '%s'.", path)
|
||||
log.Debug(fmt.Sprintf("Delete '%s'.", path))
|
||||
newKey, err := a.Modify(path, "", "", nameresolver)
|
||||
if err == nil {
|
||||
glog.V(logger.Debug).Infof("Swarm replaced manifest by '%s'", newKey)
|
||||
log.Debug(fmt.Sprintf("Swarm replaced manifest by '%s'", newKey))
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
http.ServeContent(w, r, "", time.Now(), bytes.NewReader([]byte(newKey)))
|
||||
} else {
|
||||
@ -206,7 +200,7 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
if parsedurl == path {
|
||||
key, err := a.Resolve(parsedurl, nameresolver)
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("%v", err)
|
||||
log.Error(fmt.Sprintf("%v", err))
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
@ -226,12 +220,12 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
quitC := make(chan bool)
|
||||
size, err := reader.Size(quitC)
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("Could not determine size: %v", err.Error())
|
||||
log.Debug(fmt.Sprintf("Could not determine size: %v", err.Error()))
|
||||
//An error on call to Size means we don't have the root chunk
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Reading %d bytes.", size)
|
||||
log.Debug(fmt.Sprintf("Reading %d bytes.", size))
|
||||
|
||||
// setting mime type
|
||||
qv := requestURL.Query()
|
||||
@ -242,11 +236,11 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
|
||||
w.Header().Set("Content-Type", mimeType)
|
||||
http.ServeContent(w, r, uri, forever(), reader)
|
||||
glog.V(logger.Debug).Infof("Serve raw content '%s' (%d bytes) as '%s'", uri, size, mimeType)
|
||||
log.Debug(fmt.Sprintf("Serve raw content '%s' (%d bytes) as '%s'", uri, size, mimeType))
|
||||
|
||||
// retrieve path via manifest
|
||||
} else {
|
||||
glog.V(logger.Debug).Infof("Structured GET request '%s' received.", uri)
|
||||
log.Debug(fmt.Sprintf("Structured GET request '%s' received.", uri))
|
||||
// add trailing slash, if missing
|
||||
if rootDocumentUri.MatchString(uri) {
|
||||
http.Redirect(w, r, path+"/", http.StatusFound)
|
||||
@ -255,10 +249,10 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
reader, mimeType, status, err := a.Get(path, nameresolver)
|
||||
if err != nil {
|
||||
if _, ok := err.(api.ErrResolve); ok {
|
||||
glog.V(logger.Debug).Infof("%v", err)
|
||||
log.Debug(fmt.Sprintf("%v", err))
|
||||
status = http.StatusBadRequest
|
||||
} else {
|
||||
glog.V(logger.Debug).Infof("error retrieving '%s': %v", uri, err)
|
||||
log.Debug(fmt.Sprintf("error retrieving '%s': %v", uri, err))
|
||||
status = http.StatusNotFound
|
||||
}
|
||||
http.Error(w, err.Error(), status)
|
||||
@ -274,12 +268,12 @@ func handler(w http.ResponseWriter, r *http.Request, a *api.Api) {
|
||||
quitC := make(chan bool)
|
||||
size, err := reader.Size(quitC)
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("Could not determine size: %v", err.Error())
|
||||
log.Debug(fmt.Sprintf("Could not determine size: %v", err.Error()))
|
||||
//An error on call to Size means we don't have the root chunk
|
||||
http.Error(w, err.Error(), http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Served '%s' (%d bytes) as '%s' (status code: %v)", uri, size, mimeType, status)
|
||||
log.Debug(fmt.Sprintf("Served '%s' (%d bytes) as '%s' (status code: %v)", uri, size, mimeType, status))
|
||||
|
||||
http.ServeContent(w, r, path, forever(), reader)
|
||||
|
||||
@ -293,11 +287,11 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error
|
||||
self.lock.Lock()
|
||||
// assert self.pos <= off
|
||||
if self.pos > off {
|
||||
glog.V(logger.Error).Infof("non-sequential read attempted from sequentialReader; %d > %d", self.pos, off)
|
||||
log.Error(fmt.Sprintf("non-sequential read attempted from sequentialReader; %d > %d", self.pos, off))
|
||||
panic("Non-sequential read attempt")
|
||||
}
|
||||
if self.pos != off {
|
||||
glog.V(logger.Debug).Infof("deferred read in POST at position %d, offset %d.", self.pos, off)
|
||||
log.Debug(fmt.Sprintf("deferred read in POST at position %d, offset %d.", self.pos, off))
|
||||
wait := make(chan bool)
|
||||
self.ahead[off] = wait
|
||||
self.lock.Unlock()
|
||||
@ -313,9 +307,9 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error
|
||||
for localPos < len(target) {
|
||||
n, err = self.reader.Read(target[localPos:])
|
||||
localPos += n
|
||||
glog.V(logger.Debug).Infof("Read %d bytes into buffer size %d from POST, error %v.", n, len(target), err)
|
||||
log.Debug(fmt.Sprintf("Read %d bytes into buffer size %d from POST, error %v.", n, len(target), err))
|
||||
if err != nil {
|
||||
glog.V(logger.Debug).Infof("POST stream's reading terminated with %v.", err)
|
||||
log.Debug(fmt.Sprintf("POST stream's reading terminated with %v.", err))
|
||||
for i := range self.ahead {
|
||||
self.ahead[i] <- true
|
||||
delete(self.ahead, i)
|
||||
@ -327,7 +321,7 @@ func (self *sequentialReader) ReadAt(target []byte, off int64) (n int, err error
|
||||
}
|
||||
wait := self.ahead[self.pos]
|
||||
if wait != nil {
|
||||
glog.V(logger.Debug).Infof("deferred read in POST at position %d triggered.", self.pos)
|
||||
log.Debug(fmt.Sprintf("deferred read in POST at position %d triggered.", self.pos))
|
||||
delete(self.ahead, self.pos)
|
||||
close(wait)
|
||||
}
|
||||
|
@ -23,8 +23,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -52,7 +51,7 @@ type manifestTrieEntry struct {
|
||||
|
||||
func loadManifest(dpa *storage.DPA, hash storage.Key, quitC chan bool) (trie *manifestTrie, err error) { // non-recursive, subtrees are downloaded on-demand
|
||||
|
||||
glog.V(logger.Detail).Infof("manifest lookup key: '%v'.", hash.Log())
|
||||
log.Trace(fmt.Sprintf("manifest lookup key: '%v'.", hash.Log()))
|
||||
// retrieve manifest via DPA
|
||||
manifestReader := dpa.Retrieve(hash)
|
||||
return readManifest(manifestReader, hash, dpa, quitC)
|
||||
@ -70,23 +69,23 @@ func readManifest(manifestReader storage.LazySectionReader, hash storage.Key, dp
|
||||
manifestData := make([]byte, size)
|
||||
read, err := manifestReader.Read(manifestData)
|
||||
if int64(read) < size {
|
||||
glog.V(logger.Detail).Infof("Manifest %v not found.", hash.Log())
|
||||
log.Trace(fmt.Sprintf("Manifest %v not found.", hash.Log()))
|
||||
if err == nil {
|
||||
err = fmt.Errorf("Manifest retrieval cut short: read %v, expect %v", read, size)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infof("Manifest %v retrieved", hash.Log())
|
||||
log.Trace(fmt.Sprintf("Manifest %v retrieved", hash.Log()))
|
||||
man := manifestJSON{}
|
||||
err = json.Unmarshal(manifestData, &man)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Manifest %v is malformed: %v", hash.Log(), err)
|
||||
glog.V(logger.Detail).Infof("%v", err)
|
||||
log.Trace(fmt.Sprintf("%v", err))
|
||||
return
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infof("Manifest %v has %d entries.", hash.Log(), len(man.Entries))
|
||||
log.Trace(fmt.Sprintf("Manifest %v has %d entries.", hash.Log(), len(man.Entries)))
|
||||
|
||||
trie = &manifestTrie{
|
||||
dpa: dpa,
|
||||
@ -286,7 +285,7 @@ func (self *manifestTrie) listWithPrefix(prefix string, quitC chan bool, cb func
|
||||
|
||||
func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *manifestTrieEntry, pos int) {
|
||||
|
||||
glog.V(logger.Detail).Infof("findPrefixOf(%s)", path)
|
||||
log.Trace(fmt.Sprintf("findPrefixOf(%s)", path))
|
||||
|
||||
if len(path) == 0 {
|
||||
return self.entries[256], 0
|
||||
@ -298,9 +297,9 @@ func (self *manifestTrie) findPrefixOf(path string, quitC chan bool) (entry *man
|
||||
return self.entries[256], 0
|
||||
}
|
||||
epl := len(entry.Path)
|
||||
glog.V(logger.Detail).Infof("path = %v entry.Path = %v epl = %v", path, entry.Path, epl)
|
||||
log.Trace(fmt.Sprintf("path = %v entry.Path = %v epl = %v", path, entry.Path, epl))
|
||||
if (len(path) >= epl) && (path[:epl] == entry.Path) {
|
||||
glog.V(logger.Detail).Infof("entry.ContentType = %v", entry.ContentType)
|
||||
log.Trace(fmt.Sprintf("entry.ContentType = %v", entry.ContentType))
|
||||
if entry.ContentType == manifestType {
|
||||
err := self.loadSubTrie(entry, quitC)
|
||||
if err != nil {
|
||||
|
@ -19,10 +19,10 @@ package network
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -60,8 +60,8 @@ func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
|
||||
missing = append(missing, req)
|
||||
}
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State)
|
||||
glog.V(logger.Detail).Infof("Depo.HandleUnsyncedKeysMsg: received %v", unsynced)
|
||||
log.Debug(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v unsynced keys: %v missing. new state: %v", len(unsynced), len(missing), req.State))
|
||||
log.Trace(fmt.Sprintf("Depo.HandleUnsyncedKeysMsg: received %v", unsynced))
|
||||
// send delivery request with missing keys
|
||||
err = p.deliveryRequest(missing)
|
||||
if err != nil {
|
||||
@ -81,7 +81,7 @@ func (self *Depo) HandleUnsyncedKeysMsg(req *unsyncedKeysMsgData, p *peer) error
|
||||
func (self *Depo) HandleDeliveryRequestMsg(req *deliveryRequestMsgData, p *peer) error {
|
||||
deliver := req.Deliver
|
||||
// queue the actual delivery of a chunk ()
|
||||
glog.V(logger.Detail).Infof("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleDeliveryRequestMsg: received %v delivery requests: %v", len(deliver), deliver))
|
||||
for _, sreq := range deliver {
|
||||
// TODO: look up in cache here or in deliveries
|
||||
// priorities are taken from the message so the remote party can
|
||||
@ -104,19 +104,19 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
||||
chunk, err := self.localStore.Get(req.Key)
|
||||
switch {
|
||||
case err != nil:
|
||||
glog.V(logger.Detail).Infof("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key)
|
||||
log.Trace(fmt.Sprintf("Depo.handleStoreRequest: %v not found locally. create new chunk/request", req.Key))
|
||||
// not found in memory cache, ie., a genuine store request
|
||||
// create chunk
|
||||
chunk = storage.NewChunk(req.Key, nil)
|
||||
|
||||
case chunk.SData == nil:
|
||||
// found chunk in memory store, needs the data, validate now
|
||||
glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v. request entry found", req)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v. request entry found", req))
|
||||
|
||||
default:
|
||||
// data is found, store request ignored
|
||||
// this should update access count?
|
||||
glog.V(logger.Detail).Infof("Depo.HandleStoreRequest: %v found locally. ignore.", req)
|
||||
log.Trace(fmt.Sprintf("Depo.HandleStoreRequest: %v found locally. ignore.", req))
|
||||
islocal = true
|
||||
//return
|
||||
}
|
||||
@ -126,7 +126,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
||||
if !bytes.Equal(hasher.Sum(nil), req.Key) {
|
||||
// data does not validate, ignore
|
||||
// TODO: peer should be penalised/dropped?
|
||||
glog.V(logger.Warn).Infof("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req)
|
||||
log.Warn(fmt.Sprintf("Depo.HandleStoreRequest: chunk invalid. store request ignored: %v", req))
|
||||
return
|
||||
}
|
||||
|
||||
@ -136,7 +136,7 @@ func (self *Depo) HandleStoreRequestMsg(req *storeRequestMsgData, p *peer) {
|
||||
// update chunk with size and data
|
||||
chunk.SData = req.SData // protocol validates that SData is minimum 9 bytes long (int64 size + at least one byte of data)
|
||||
chunk.Size = int64(binary.LittleEndian.Uint64(req.SData[0:8]))
|
||||
glog.V(logger.Detail).Infof("delivery of %v from %v", chunk, p)
|
||||
log.Trace(fmt.Sprintf("delivery of %v from %v", chunk, p))
|
||||
chunk.Source = p
|
||||
self.netStore.Put(chunk)
|
||||
}
|
||||
@ -152,7 +152,7 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
|
||||
err = p.swap.Add(1)
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err)
|
||||
log.Warn(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - cannot process request: %v", req.Key.Log(), err))
|
||||
return
|
||||
}
|
||||
|
||||
@ -163,7 +163,7 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
|
||||
req = self.strategyUpdateRequest(chunk.Req, req)
|
||||
// check if we can immediately deliver
|
||||
if chunk.SData != nil {
|
||||
glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, delivering...", req.Key.Log()))
|
||||
|
||||
if req.MaxSize == 0 || int64(req.MaxSize) >= chunk.Size {
|
||||
sreq := &storeRequestMsgData{
|
||||
@ -174,16 +174,16 @@ func (self *Depo) HandleRetrieveRequestMsg(req *retrieveRequestMsgData, p *peer)
|
||||
}
|
||||
p.syncer.addRequest(sreq, DeliverReq)
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content found, not wanted", req.Key.Log()))
|
||||
}
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Depo.HandleRetrieveRequest: %v - content not found locally. asked swarm for help. will get back", req.Key.Log()))
|
||||
}
|
||||
}
|
||||
|
||||
// add peer request the chunk and decides the timeout for the response if still searching
|
||||
func (self *Depo) strategyUpdateRequest(rs *storage.RequestStatus, origReq *retrieveRequestMsgData) (req *retrieveRequestMsgData) {
|
||||
glog.V(logger.Detail).Infof("Depo.strategyUpdateRequest: key %v", origReq.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Depo.strategyUpdateRequest: key %v", origReq.Key.Log()))
|
||||
// we do not create an alternative one
|
||||
req = origReq
|
||||
if rs != nil {
|
||||
@ -211,7 +211,7 @@ only add if less than requesterCount peers forwarded the same request id so far
|
||||
note this is done irrespective of status (searching or found)
|
||||
*/
|
||||
func (self *Depo) addRequester(rs *storage.RequestStatus, req *retrieveRequestMsgData) {
|
||||
glog.V(logger.Detail).Infof("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id)
|
||||
log.Trace(fmt.Sprintf("Depo.addRequester: key %v - add peer to req.Id %v", req.Key.Log(), req.from, req.Id))
|
||||
list := rs.Requesters[req.Id]
|
||||
rs.Requesters[req.Id] = append(list, req)
|
||||
}
|
||||
|
@ -17,11 +17,11 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -56,10 +56,10 @@ var searchTimeout = 3 * time.Second
|
||||
// logic propagating retrieve requests to peers given by the kademlia hive
|
||||
func (self *forwarder) Retrieve(chunk *storage.Chunk) {
|
||||
peers := self.hive.getPeers(chunk.Key, 0)
|
||||
glog.V(logger.Detail).Infof("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers))
|
||||
log.Trace(fmt.Sprintf("forwarder.Retrieve: %v - received %d peers from KΛÐΞMLIΛ...", chunk.Key.Log(), len(peers)))
|
||||
OUT:
|
||||
for _, p := range peers {
|
||||
glog.V(logger.Detail).Infof("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p)
|
||||
log.Trace(fmt.Sprintf("forwarder.Retrieve: sending retrieveRequest %v to peer [%v]", chunk.Key.Log(), p))
|
||||
for _, recipients := range chunk.Req.Requesters {
|
||||
for _, recipient := range recipients {
|
||||
req := recipient.(*retrieveRequestMsgData)
|
||||
@ -80,7 +80,7 @@ OUT:
|
||||
p.retrieve(req)
|
||||
break OUT
|
||||
}
|
||||
glog.V(logger.Warn).Infof("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err)
|
||||
log.Warn(fmt.Sprintf("forwarder.Retrieve: unable to send retrieveRequest to peer [%v]: %v", chunk.Key.Log(), err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -98,14 +98,14 @@ func (self *forwarder) Store(chunk *storage.Chunk) {
|
||||
source = chunk.Source.(*peer)
|
||||
}
|
||||
for _, p := range self.hive.getPeers(chunk.Key, 0) {
|
||||
glog.V(logger.Detail).Infof("forwarder.Store: %v %v", p, chunk)
|
||||
log.Trace(fmt.Sprintf("forwarder.Store: %v %v", p, chunk))
|
||||
|
||||
if p.syncer != nil && (source == nil || p.Addr() != source.Addr()) {
|
||||
n++
|
||||
Deliver(p, msg, PropagateReq)
|
||||
}
|
||||
}
|
||||
glog.V(logger.Detail).Infof("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk)
|
||||
log.Trace(fmt.Sprintf("forwarder.Store: sent to %v peers (chunk = %v)", n, chunk))
|
||||
}
|
||||
|
||||
// once a chunk is found deliver it to its requesters unless timed out
|
||||
@ -123,7 +123,7 @@ func (self *forwarder) Deliver(chunk *storage.Chunk) {
|
||||
for id, r := range requesters {
|
||||
req = r.(*retrieveRequestMsgData)
|
||||
if req.timeout == nil || req.timeout.After(time.Now()) {
|
||||
glog.V(logger.Detail).Infof("forwarder.Deliver: %v -> %v", req.Id, req.from)
|
||||
log.Trace(fmt.Sprintf("forwarder.Deliver: %v -> %v", req.Id, req.from))
|
||||
msg.Id = uint64(id)
|
||||
Deliver(req.from, msg, DeliverReq)
|
||||
n++
|
||||
@ -133,7 +133,7 @@ func (self *forwarder) Deliver(chunk *storage.Chunk) {
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.V(logger.Detail).Infof("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n)
|
||||
log.Trace(fmt.Sprintf("forwarder.Deliver: submit chunk %v (request id %v) for delivery to %v peers", chunk.Key.Log(), id, n))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -23,8 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||
"github.com/ethereum/go-ethereum/swarm/network/kademlia"
|
||||
@ -129,7 +128,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee
|
||||
self.listenAddr = listenAddr
|
||||
err = self.kad.Load(self.path, nil)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("Warning: error reading kaddb '%s' (skipping): %v", self.path, err)
|
||||
log.Warn(fmt.Sprintf("Warning: error reading kaddb '%s' (skipping): %v", self.path, err))
|
||||
err = nil
|
||||
}
|
||||
// this loop is doing bootstrapping and maintains a healthy table
|
||||
@ -145,7 +144,7 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee
|
||||
node, need, proxLimit := self.kad.Suggest()
|
||||
|
||||
if node != nil && len(node.Url) > 0 {
|
||||
glog.V(logger.Detail).Infof("call known bee %v", node.Url)
|
||||
log.Trace(fmt.Sprintf("call known bee %v", node.Url))
|
||||
// enode or any lower level connection address is unnecessary in future
|
||||
// discovery table is used to look it up.
|
||||
connectPeer(node.Url)
|
||||
@ -159,21 +158,21 @@ func (self *Hive) Start(id discover.NodeID, listenAddr func() string, connectPee
|
||||
req := &retrieveRequestMsgData{
|
||||
Key: storage.Key(randAddr[:]),
|
||||
}
|
||||
glog.V(logger.Detail).Infof("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0])
|
||||
log.Trace(fmt.Sprintf("call any bee near %v (PO%03d) - messenger bee: %v", randAddr, proxLimit, peers[0]))
|
||||
peers[0].(*peer).retrieve(req)
|
||||
} else {
|
||||
glog.V(logger.Warn).Infof("no peer")
|
||||
log.Warn(fmt.Sprintf("no peer"))
|
||||
}
|
||||
glog.V(logger.Detail).Infof("buzz kept alive")
|
||||
log.Trace(fmt.Sprintf("buzz kept alive"))
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("no need for more bees")
|
||||
log.Info(fmt.Sprintf("no need for more bees"))
|
||||
}
|
||||
select {
|
||||
case self.toggle <- need:
|
||||
case <-self.quit:
|
||||
return
|
||||
}
|
||||
glog.V(logger.Debug).Infof("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount())
|
||||
log.Debug(fmt.Sprintf("queen's address: %v, population: %d (%d)", self.addr, self.kad.Count(), self.kad.DBCount()))
|
||||
}
|
||||
}()
|
||||
return
|
||||
@ -192,7 +191,7 @@ func (self *Hive) keepAlive() {
|
||||
if self.kad.DBCount() > 0 {
|
||||
select {
|
||||
case self.more <- true:
|
||||
glog.V(logger.Debug).Infof("buzz wakeup")
|
||||
log.Debug(fmt.Sprintf("buzz wakeup"))
|
||||
default:
|
||||
}
|
||||
}
|
||||
@ -224,7 +223,7 @@ func (self *Hive) addPeer(p *peer) error {
|
||||
default:
|
||||
}
|
||||
}()
|
||||
glog.V(logger.Detail).Infof("hi new bee %v", p)
|
||||
log.Trace(fmt.Sprintf("hi new bee %v", p))
|
||||
err := self.kad.On(p, loadSync)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -235,21 +234,21 @@ func (self *Hive) addPeer(p *peer) error {
|
||||
// to send the 6 byte self lookup
|
||||
// we do not record as request or forward it, just reply with peers
|
||||
p.retrieve(&retrieveRequestMsgData{})
|
||||
glog.V(logger.Detail).Infof("'whatsup wheresdaparty' sent to %v", p)
|
||||
log.Trace(fmt.Sprintf("'whatsup wheresdaparty' sent to %v", p))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// called after peer disconnected
|
||||
func (self *Hive) removePeer(p *peer) {
|
||||
glog.V(logger.Debug).Infof("bee %v removed", p)
|
||||
log.Debug(fmt.Sprintf("bee %v removed", p))
|
||||
self.kad.Off(p, saveSync)
|
||||
select {
|
||||
case self.more <- true:
|
||||
default:
|
||||
}
|
||||
if self.kad.Count() == 0 {
|
||||
glog.V(logger.Debug).Infof("empty, all bees gone")
|
||||
log.Debug(fmt.Sprintf("empty, all bees gone"))
|
||||
}
|
||||
}
|
||||
|
||||
@ -265,7 +264,7 @@ func (self *Hive) getPeers(target storage.Key, max int) (peers []*peer) {
|
||||
|
||||
// disconnects all the peers
|
||||
func (self *Hive) DropAll() {
|
||||
glog.V(logger.Info).Infof("dropping all bees")
|
||||
log.Info(fmt.Sprintf("dropping all bees"))
|
||||
for _, node := range self.kad.FindClosest(kademlia.Address{}, 0) {
|
||||
node.Drop()
|
||||
}
|
||||
@ -290,7 +289,7 @@ func (self *Hive) HandlePeersMsg(req *peersMsgData, from *peer) {
|
||||
var nrs []*kademlia.NodeRecord
|
||||
for _, p := range req.Peers {
|
||||
if err := netutil.CheckRelayIP(from.remoteAddr.IP, p.IP); err != nil {
|
||||
glog.V(logger.Detail).Infof("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err)
|
||||
log.Trace(fmt.Sprintf("invalid peer IP %v from %v: %v", from.remoteAddr.IP, p.IP, err))
|
||||
continue
|
||||
}
|
||||
nrs = append(nrs, newNodeRecord(p))
|
||||
@ -326,7 +325,7 @@ func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
|
||||
return fmt.Errorf("invalid type")
|
||||
}
|
||||
if record.Meta == nil {
|
||||
glog.V(logger.Debug).Infof("no sync state for node record %v setting default", record)
|
||||
log.Debug(fmt.Sprintf("no sync state for node record %v setting default", record))
|
||||
p.syncState = &syncState{DbSyncState: &storage.DbSyncState{}}
|
||||
return nil
|
||||
}
|
||||
@ -334,7 +333,7 @@ func loadSync(record *kademlia.NodeRecord, node kademlia.Node) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("error decoding kddb record meta info into a sync state: %v", err)
|
||||
}
|
||||
glog.V(logger.Detail).Infof("sync state for node record %v read from Meta: %s", record, string(*(record.Meta)))
|
||||
log.Trace(fmt.Sprintf("sync state for node record %v read from Meta: %s", record, string(*(record.Meta))))
|
||||
p.syncState = state
|
||||
return err
|
||||
}
|
||||
@ -344,10 +343,10 @@ func saveSync(record *kademlia.NodeRecord, node kademlia.Node) {
|
||||
if p, ok := node.(*peer); ok {
|
||||
meta, err := encodeSync(p.syncState)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("error saving sync state for %v: %v", node, err)
|
||||
log.Warn(fmt.Sprintf("error saving sync state for %v: %v", node, err))
|
||||
return
|
||||
}
|
||||
glog.V(logger.Detail).Infof("saved sync state for %v: %s", node, string(*meta))
|
||||
log.Trace(fmt.Sprintf("saved sync state for %v: %s", node, string(*meta)))
|
||||
record.Meta = meta
|
||||
}
|
||||
}
|
||||
@ -370,7 +369,7 @@ func (self *Hive) peers(req *retrieveRequestMsgData) {
|
||||
for _, peer := range self.getPeers(key, int(req.MaxPeers)) {
|
||||
addrs = append(addrs, peer.remoteAddr)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log())
|
||||
log.Debug(fmt.Sprintf("Hive sending %d peer addresses to %v. req.Id: %v, req.Key: %v", len(addrs), req.from, req.Id, req.Key.Log()))
|
||||
|
||||
peersData := &peersMsgData{
|
||||
Peers: addrs,
|
||||
|
@ -24,8 +24,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
type NodeData interface {
|
||||
@ -88,12 +87,12 @@ func (self *KadDb) findOrCreate(index int, a Address, url string) *NodeRecord {
|
||||
Addr: a,
|
||||
Url: url,
|
||||
}
|
||||
glog.V(logger.Info).Infof("add new record %v to kaddb", record)
|
||||
log.Info(fmt.Sprintf("add new record %v to kaddb", record))
|
||||
// insert in kaddb
|
||||
self.index[a] = record
|
||||
self.Nodes[index] = append(self.Nodes[index], record)
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("found record %v in kaddb", record)
|
||||
log.Info(fmt.Sprintf("found record %v in kaddb", record))
|
||||
}
|
||||
// update last seen time
|
||||
record.setSeen()
|
||||
@ -121,13 +120,13 @@ func (self *KadDb) add(nrs []*NodeRecord, proximityBin func(Address) int) {
|
||||
copy(newnodes[:], nodes[:dbcursor])
|
||||
newnodes[dbcursor] = node
|
||||
copy(newnodes[dbcursor+1:], nodes[dbcursor:])
|
||||
glog.V(logger.Detail).Infof("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes)
|
||||
log.Trace(fmt.Sprintf("new nodes: %v (keys: %v)\nnodes: %v", newnodes, nodes))
|
||||
self.Nodes[index] = newnodes
|
||||
n++
|
||||
}
|
||||
}
|
||||
if n > 0 {
|
||||
glog.V(logger.Debug).Infof("%d/%d node records (new/known)", n, len(nrs))
|
||||
log.Debug(fmt.Sprintf("%d/%d node records (new/known)", n, len(nrs)))
|
||||
}
|
||||
}
|
||||
|
||||
@ -207,13 +206,13 @@ func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRe
|
||||
|
||||
// skip already connected nodes
|
||||
if node.node != nil {
|
||||
glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow))
|
||||
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d/%d) already connected", node.Addr, po, cursor, len(dbrow)))
|
||||
continue ROW
|
||||
}
|
||||
|
||||
// if node is scheduled to connect
|
||||
if time.Time(node.After).After(time.Now()) {
|
||||
glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
|
||||
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) skipped. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
|
||||
continue ROW
|
||||
}
|
||||
|
||||
@ -224,17 +223,17 @@ func (self *KadDb) findBest(maxBinSize int, binSize func(int) int) (node *NodeRe
|
||||
if delta > self.purgeInterval {
|
||||
// remove node
|
||||
purge[cursor] = true
|
||||
glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen)
|
||||
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) unreachable since %v. Removed", node.Addr, po, cursor, node.Seen))
|
||||
continue ROW
|
||||
}
|
||||
|
||||
glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After)
|
||||
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) ready to be tried. seen at %v (%v ago), scheduled at %v", node.Addr, po, cursor, node.Seen, delta, node.After))
|
||||
|
||||
// scheduling next check
|
||||
interval = time.Duration(delta * time.Duration(self.connRetryExp))
|
||||
after = time.Now().Add(interval)
|
||||
|
||||
glog.V(logger.Debug).Infof("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval)
|
||||
log.Debug(fmt.Sprintf("kaddb record %v (PO%03d:%d) selected as candidate connection %v. seen at %v (%v ago), selectable since %v, retry after %v (in %v)", node.Addr, po, cursor, rounds, node.Seen, delta, node.After, after, interval))
|
||||
node.After = after
|
||||
found = true
|
||||
} // ROW
|
||||
@ -295,9 +294,9 @@ func (self *KadDb) save(path string, cb func(*NodeRecord, Node)) error {
|
||||
}
|
||||
err = ioutil.WriteFile(path, data, os.ModePerm)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("unable to save kaddb with %v nodes to %v: err", n, path, err)
|
||||
log.Warn(fmt.Sprintf("unable to save kaddb with %v nodes to %v: err", n, path, err))
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("saved kaddb with %v nodes to %v", n, path)
|
||||
log.Info(fmt.Sprintf("saved kaddb with %v nodes to %v", n, path))
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -338,7 +337,7 @@ func (self *KadDb) load(path string, cb func(*NodeRecord, Node) error) (err erro
|
||||
}
|
||||
self.delete(po, purge)
|
||||
}
|
||||
glog.V(logger.Info).Infof("loaded kaddb with %v nodes from %v", n, path)
|
||||
log.Info(fmt.Sprintf("loaded kaddb with %v nodes from %v", n, path))
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -23,8 +23,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -117,7 +116,7 @@ func (self *Kademlia) DBCount() int {
|
||||
// On is the entry point called when a new nodes is added
|
||||
// unsafe in that node is not checked to be already active node (to be called once)
|
||||
func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error) {
|
||||
glog.V(logger.Warn).Infof("%v", self)
|
||||
log.Warn(fmt.Sprintf("%v", self))
|
||||
defer self.lock.Unlock()
|
||||
self.lock.Lock()
|
||||
|
||||
@ -126,11 +125,11 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error
|
||||
|
||||
if cb != nil {
|
||||
err = cb(record, node)
|
||||
glog.V(logger.Detail).Infof("cb(%v, %v) ->%v", record, node, err)
|
||||
log.Trace(fmt.Sprintf("cb(%v, %v) ->%v", record, node, err))
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to add node %v, callback error: %v", node.Addr(), err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("add node record %v with node %v", record, node)
|
||||
log.Debug(fmt.Sprintf("add node record %v with node %v", record, node))
|
||||
}
|
||||
|
||||
// insert in kademlia table of active nodes
|
||||
@ -139,7 +138,7 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error
|
||||
// TODO: give priority to peers with active traffic
|
||||
if len(bucket) < self.BucketSize { // >= allows us to add peers beyond the bucketsize limitation
|
||||
self.buckets[index] = append(bucket, node)
|
||||
glog.V(logger.Debug).Infof("add node %v to table", node)
|
||||
log.Debug(fmt.Sprintf("add node %v to table", node))
|
||||
self.setProxLimit(index, true)
|
||||
record.node = node
|
||||
self.count++
|
||||
@ -159,10 +158,10 @@ func (self *Kademlia) On(node Node, cb func(*NodeRecord, Node) error) (err error
|
||||
}
|
||||
}
|
||||
if replaced == nil {
|
||||
glog.V(logger.Debug).Infof("all peers wanted, PO%03d bucket full", index)
|
||||
log.Debug(fmt.Sprintf("all peers wanted, PO%03d bucket full", index))
|
||||
return fmt.Errorf("bucket full")
|
||||
}
|
||||
glog.V(logger.Debug).Infof("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval)
|
||||
log.Debug(fmt.Sprintf("node %v replaced by %v (idle for %v > %v)", replaced, node, idle, self.MaxIdleInterval))
|
||||
replaced.Drop()
|
||||
// actually replace in the row. When off(node) is called, the peer is no longer in the row
|
||||
bucket[pos] = node
|
||||
@ -195,7 +194,7 @@ func (self *Kademlia) Off(node Node, cb func(*NodeRecord, Node)) (err error) {
|
||||
}
|
||||
record.node = nil
|
||||
self.count--
|
||||
glog.V(logger.Debug).Infof("remove node %v from table, population now is %v", node, self.count)
|
||||
log.Debug(fmt.Sprintf("remove node %v from table, population now is %v", node, self.count))
|
||||
|
||||
return
|
||||
}
|
||||
@ -223,7 +222,7 @@ func (self *Kademlia) setProxLimit(r int, on bool) {
|
||||
self.proxLimit++
|
||||
curr = len(self.buckets[self.proxLimit])
|
||||
|
||||
glog.V(logger.Detail).Infof("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
|
||||
log.Trace(fmt.Sprintf("proxbin contraction (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -237,7 +236,7 @@ func (self *Kademlia) setProxLimit(r int, on bool) {
|
||||
//
|
||||
self.proxLimit--
|
||||
self.proxSize += len(self.buckets[self.proxLimit])
|
||||
glog.V(logger.Detail).Infof("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r)
|
||||
log.Trace(fmt.Sprintf("proxbin expansion (size: %v, limit: %v, bin: %v)", self.proxSize, self.proxLimit, r))
|
||||
}
|
||||
}
|
||||
|
||||
@ -257,7 +256,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node {
|
||||
po := self.proximityBin(target)
|
||||
index := po
|
||||
step := 1
|
||||
glog.V(logger.Detail).Infof("serving %v nodes at %v (PO%02d)", max, index, po)
|
||||
log.Trace(fmt.Sprintf("serving %v nodes at %v (PO%02d)", max, index, po))
|
||||
|
||||
// if max is set to 0, just want a full bucket, dynamic number
|
||||
min := max
|
||||
@ -276,7 +275,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node {
|
||||
n++
|
||||
}
|
||||
// terminate if index reached the bottom or enough peers > min
|
||||
glog.V(logger.Detail).Infof("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po)
|
||||
log.Trace(fmt.Sprintf("add %v -> %v (PO%02d, PO%03d)", len(self.buckets[index]), n, index, po))
|
||||
if n >= min && (step < 0 || max == 0) {
|
||||
break
|
||||
}
|
||||
@ -287,7 +286,7 @@ func (self *Kademlia) FindClosest(target Address, max int) []Node {
|
||||
}
|
||||
index += step
|
||||
}
|
||||
glog.V(logger.Detail).Infof("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po)
|
||||
log.Trace(fmt.Sprintf("serve %d (<=%d) nodes for target lookup %v (PO%03d)", n, max, target, po))
|
||||
return r.nodes
|
||||
}
|
||||
|
||||
|
@ -38,8 +38,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/contracts/chequebook"
|
||||
"github.com/ethereum/go-ethereum/errs"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
bzzswap "github.com/ethereum/go-ethereum/swarm/services/swap"
|
||||
@ -201,7 +200,7 @@ func run(requestDb *storage.LDBDatabase, depo StorageHandler, backend chequebook
|
||||
// the main forever loop that handles incoming requests
|
||||
for {
|
||||
if self.hive.blockRead {
|
||||
glog.V(logger.Warn).Infof("Cannot read network")
|
||||
log.Warn(fmt.Sprintf("Cannot read network"))
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
@ -221,7 +220,7 @@ func (self *bzz) Drop() {
|
||||
// one cycle of the main forever loop that handles and dispatches incoming messages
|
||||
func (self *bzz) handle() error {
|
||||
msg, err := self.rw.ReadMsg()
|
||||
glog.V(logger.Debug).Infof("<- %v", msg)
|
||||
log.Debug(fmt.Sprintf("<- %v", msg))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -236,7 +235,7 @@ func (self *bzz) handle() error {
|
||||
case statusMsg:
|
||||
// no extra status message allowed. The one needed already handled by
|
||||
// handleStatus
|
||||
glog.V(logger.Debug).Infof("Status message: %v", msg)
|
||||
log.Debug(fmt.Sprintf("Status message: %v", msg))
|
||||
return self.protoError(ErrExtraStatusMsg, "")
|
||||
|
||||
case storeRequestMsg:
|
||||
@ -250,7 +249,7 @@ func (self *bzz) handle() error {
|
||||
}
|
||||
// last Active time is set only when receiving chunks
|
||||
self.lastActive = time.Now()
|
||||
glog.V(logger.Detail).Infof("incoming store request: %s", req.String())
|
||||
log.Trace(fmt.Sprintf("incoming store request: %s", req.String()))
|
||||
// swap accounting is done within forwarding
|
||||
self.storage.HandleStoreRequestMsg(&req, &peer{bzz: self})
|
||||
|
||||
@ -263,7 +262,7 @@ func (self *bzz) handle() error {
|
||||
req.from = &peer{bzz: self}
|
||||
// if request is lookup and not to be delivered
|
||||
if req.isLookup() {
|
||||
glog.V(logger.Detail).Infof("self lookup for %v: responding with peers only...", req.from)
|
||||
log.Trace(fmt.Sprintf("self lookup for %v: responding with peers only...", req.from))
|
||||
} else if req.Key == nil {
|
||||
return self.protoError(ErrDecode, "protocol handler: req.Key == nil || req.Timeout == nil")
|
||||
} else {
|
||||
@ -281,7 +280,7 @@ func (self *bzz) handle() error {
|
||||
return self.protoError(ErrDecode, "<- %v: %v", msg, err)
|
||||
}
|
||||
req.from = &peer{bzz: self}
|
||||
glog.V(logger.Detail).Infof("<- peer addresses: %v", req)
|
||||
log.Trace(fmt.Sprintf("<- peer addresses: %v", req))
|
||||
self.hive.HandlePeersMsg(&req, &peer{bzz: self})
|
||||
|
||||
case syncRequestMsg:
|
||||
@ -289,7 +288,7 @@ func (self *bzz) handle() error {
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return self.protoError(ErrDecode, "<- %v: %v", msg, err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("<- sync request: %v", req)
|
||||
log.Debug(fmt.Sprintf("<- sync request: %v", req))
|
||||
self.lastActive = time.Now()
|
||||
self.sync(req.SyncState)
|
||||
|
||||
@ -299,7 +298,7 @@ func (self *bzz) handle() error {
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return self.protoError(ErrDecode, "<- %v: %v", msg, err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("<- unsynced keys : %s", req.String())
|
||||
log.Debug(fmt.Sprintf("<- unsynced keys : %s", req.String()))
|
||||
err := self.storage.HandleUnsyncedKeysMsg(&req, &peer{bzz: self})
|
||||
self.lastActive = time.Now()
|
||||
if err != nil {
|
||||
@ -313,7 +312,7 @@ func (self *bzz) handle() error {
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return self.protoError(ErrDecode, "<-msg %v: %v", msg, err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("<- delivery request: %s", req.String())
|
||||
log.Debug(fmt.Sprintf("<- delivery request: %s", req.String()))
|
||||
err := self.storage.HandleDeliveryRequestMsg(&req, &peer{bzz: self})
|
||||
self.lastActive = time.Now()
|
||||
if err != nil {
|
||||
@ -327,7 +326,7 @@ func (self *bzz) handle() error {
|
||||
if err := msg.Decode(&req); err != nil {
|
||||
return self.protoError(ErrDecode, "<- %v: %v", msg, err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("<- payment: %s", req.String())
|
||||
log.Debug(fmt.Sprintf("<- payment: %s", req.String()))
|
||||
self.swap.Receive(int(req.Units), req.Promise)
|
||||
}
|
||||
|
||||
@ -385,7 +384,7 @@ func (self *bzz) handleStatus() (err error) {
|
||||
}
|
||||
|
||||
self.remoteAddr = self.peerAddr(status.Addr)
|
||||
glog.V(logger.Detail).Infof("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr())
|
||||
log.Trace(fmt.Sprintf("self: advertised IP: %v, peer advertised: %v, local address: %v\npeer: advertised IP: %v, remote address: %v\n", self.selfAddr(), self.remoteAddr, self.peer.LocalAddr(), status.Addr.IP, self.peer.RemoteAddr()))
|
||||
|
||||
if self.swapEnabled {
|
||||
// set remote profile for accounting
|
||||
@ -395,14 +394,14 @@ func (self *bzz) handleStatus() (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(logger.Info).Infof("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId)
|
||||
log.Info(fmt.Sprintf("Peer %08x is capable (%d/%d)", self.remoteAddr.Addr[:4], status.Version, status.NetworkId))
|
||||
err = self.hive.addPeer(&peer{bzz: self})
|
||||
if err != nil {
|
||||
return self.protoError(ErrUnwanted, "%v", err)
|
||||
}
|
||||
|
||||
// hive sets syncstate so sync should start after node added
|
||||
glog.V(logger.Info).Infof("syncronisation request sent with %v", self.syncState)
|
||||
log.Info(fmt.Sprintf("syncronisation request sent with %v", self.syncState))
|
||||
self.syncRequest()
|
||||
|
||||
return nil
|
||||
@ -421,7 +420,7 @@ func (self *bzz) sync(state *syncState) error {
|
||||
// an explicitly received nil syncstate disables syncronisation
|
||||
if state == nil {
|
||||
self.syncEnabled = false
|
||||
glog.V(logger.Warn).Infof("syncronisation disabled for peer %v", self)
|
||||
log.Warn(fmt.Sprintf("syncronisation disabled for peer %v", self))
|
||||
state = &syncState{DbSyncState: &storage.DbSyncState{}, Synced: true}
|
||||
} else {
|
||||
state.synced = make(chan bool)
|
||||
@ -430,7 +429,7 @@ func (self *bzz) sync(state *syncState) error {
|
||||
state.Start = storage.Key(start[:])
|
||||
state.Stop = storage.Key(stop[:])
|
||||
}
|
||||
glog.V(logger.Debug).Infof("syncronisation requested by peer %v at state %v", self, state)
|
||||
log.Debug(fmt.Sprintf("syncronisation requested by peer %v at state %v", self, state))
|
||||
}
|
||||
var err error
|
||||
self.syncer, err = newSyncer(
|
||||
@ -443,7 +442,7 @@ func (self *bzz) sync(state *syncState) error {
|
||||
if err != nil {
|
||||
return self.protoError(ErrSync, "%v", err)
|
||||
}
|
||||
glog.V(logger.Detail).Infof("syncer set for peer %v", self)
|
||||
log.Trace(fmt.Sprintf("syncer set for peer %v", self))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -490,11 +489,11 @@ func (self *bzz) store(req *storeRequestMsgData) error {
|
||||
func (self *bzz) syncRequest() error {
|
||||
req := &syncRequestMsgData{}
|
||||
if self.hive.syncEnabled {
|
||||
glog.V(logger.Debug).Infof("syncronisation request to peer %v at state %v", self, self.syncState)
|
||||
log.Debug(fmt.Sprintf("syncronisation request to peer %v at state %v", self, self.syncState))
|
||||
req.SyncState = self.syncState
|
||||
}
|
||||
if self.syncState == nil {
|
||||
glog.V(logger.Warn).Infof("syncronisation disabled for peer %v at state %v", self, self.syncState)
|
||||
log.Warn(fmt.Sprintf("syncronisation disabled for peer %v at state %v", self, self.syncState))
|
||||
}
|
||||
return self.send(syncRequestMsg, req)
|
||||
}
|
||||
@ -534,7 +533,7 @@ func (self *bzz) peers(req *peersMsgData) error {
|
||||
|
||||
func (self *bzz) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
|
||||
err = self.errors.New(code, format, params...)
|
||||
err.Log(glog.V(logger.Info))
|
||||
log.Info(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@ -542,7 +541,7 @@ func (self *bzz) send(msg uint64, data interface{}) error {
|
||||
if self.hive.blockWrite {
|
||||
return fmt.Errorf("network write blocked")
|
||||
}
|
||||
glog.V(logger.Detail).Infof("-> %v: %v (%T) to %v", msg, data, data, self)
|
||||
log.Trace(fmt.Sprintf("-> %v: %v (%T) to %v", msg, data, data, self))
|
||||
err := p2p.Send(self.rw, msg, data)
|
||||
if err != nil {
|
||||
self.Drop()
|
||||
|
@ -20,8 +20,7 @@ import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
@ -80,7 +79,7 @@ func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSi
|
||||
batch: make(chan chan int),
|
||||
dbBatchSize: dbBatchSize,
|
||||
}
|
||||
glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)
|
||||
log.Trace(fmt.Sprintf("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority))
|
||||
|
||||
// starts the main forever loop reading from buffer
|
||||
go syncdb.bufferRead(deliver)
|
||||
@ -126,9 +125,9 @@ func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
|
||||
var counter uint64
|
||||
if err == nil {
|
||||
counter = binary.BigEndian.Uint64(data)
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter))
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter))
|
||||
}
|
||||
|
||||
LOOP:
|
||||
@ -142,7 +141,7 @@ LOOP:
|
||||
// if syncdb is stopped. In this case we need to save the item to the db
|
||||
more = deliver(req, self.quit)
|
||||
if !more {
|
||||
glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
|
||||
log.Debug(fmt.Sprintf("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
|
||||
// received quit signal, save request currently waiting delivery
|
||||
// by switching to db mode and closing the buffer
|
||||
buffer = nil
|
||||
@ -152,12 +151,12 @@ LOOP:
|
||||
break // break from select, this item will be written to the db
|
||||
}
|
||||
self.total++
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total))
|
||||
// by the time deliver returns, there were new writes to the buffer
|
||||
// if buffer contention is detected, switch to db mode which drains
|
||||
// the buffer so no process will block on pushing store requests
|
||||
if len(buffer) == cap(buffer) {
|
||||
glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
|
||||
log.Debug(fmt.Sprintf("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total))
|
||||
buffer = nil
|
||||
db = self.buffer
|
||||
}
|
||||
@ -170,7 +169,7 @@ LOOP:
|
||||
binary.BigEndian.PutUint64(counterValue, counter)
|
||||
batch.Put(self.counterKey, counterValue) // persist counter in batch
|
||||
self.writeSyncBatch(batch) // save batch
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority))
|
||||
break LOOP
|
||||
}
|
||||
self.dbTotal++
|
||||
@ -181,7 +180,7 @@ LOOP:
|
||||
if inBatch == 0 && quit != nil {
|
||||
// there was no writes since the last batch so db depleted
|
||||
// switch to buffer mode
|
||||
glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
|
||||
log.Debug(fmt.Sprintf("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority))
|
||||
db = nil
|
||||
buffer = self.buffer
|
||||
dbSize <- 0 // indicates to 'caller' that batch has been written
|
||||
@ -190,7 +189,7 @@ LOOP:
|
||||
}
|
||||
binary.BigEndian.PutUint64(counterValue, counter)
|
||||
batch.Put(self.counterKey, counterValue)
|
||||
glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
|
||||
log.Debug(fmt.Sprintf("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue))
|
||||
batch = self.writeSyncBatch(batch)
|
||||
dbSize <- inBatch // indicates to 'caller' that batch has been written
|
||||
inBatch = 0
|
||||
@ -202,7 +201,7 @@ LOOP:
|
||||
db = self.buffer
|
||||
buffer = nil
|
||||
quit = nil
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority))
|
||||
close(db)
|
||||
continue LOOP
|
||||
}
|
||||
@ -210,15 +209,15 @@ LOOP:
|
||||
// only get here if we put req into db
|
||||
entry, err = self.newSyncDbEntry(req, counter)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
|
||||
log.Warn(fmt.Sprintf("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err))
|
||||
continue LOOP
|
||||
}
|
||||
batch.Put(entry.key, entry.val)
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter))
|
||||
// if just switched to db mode and not quitting, then launch dbRead
|
||||
// in a parallel go routine to send deliveries from db
|
||||
if inDb == 0 && quit != nil {
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] start dbRead"))
|
||||
go self.dbRead(true, counter, deliver)
|
||||
}
|
||||
inDb++
|
||||
@ -229,7 +228,7 @@ LOOP:
|
||||
batch = self.writeSyncBatch(batch)
|
||||
}
|
||||
}
|
||||
glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
|
||||
log.Info(fmt.Sprintf("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter))
|
||||
close(self.done)
|
||||
}
|
||||
|
||||
@ -237,7 +236,7 @@ LOOP:
|
||||
func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
|
||||
err := self.db.Write(batch)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
|
||||
log.Warn(fmt.Sprintf("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err))
|
||||
return batch
|
||||
}
|
||||
return new(leveldb.Batch)
|
||||
@ -311,7 +310,7 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}
|
||||
continue
|
||||
}
|
||||
del = new(leveldb.Batch)
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt))
|
||||
|
||||
for n = 0; !useBatches || n < cnt; it.Next() {
|
||||
copy(key, it.Key())
|
||||
@ -323,11 +322,11 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}
|
||||
val := make([]byte, 40)
|
||||
copy(val, it.Value())
|
||||
entry = &syncDbEntry{key, val}
|
||||
// glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
|
||||
// log.Trace(fmt.Sprintf("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total))
|
||||
more = fun(entry, self.quit)
|
||||
if !more {
|
||||
// quit received when waiting to deliver entry, the entry will not be deleted
|
||||
glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
|
||||
log.Trace(fmt.Sprintf("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt))
|
||||
break
|
||||
}
|
||||
// since subsequent batches of the same db session are indexed incrementally
|
||||
@ -337,7 +336,7 @@ func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}
|
||||
n++
|
||||
total++
|
||||
}
|
||||
glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
|
||||
log.Debug(fmt.Sprintf("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total))
|
||||
self.db.Write(del) // this could be async called only when db is idle
|
||||
it.Release()
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package network
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -25,14 +26,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
func init() {
|
||||
glog.SetV(0)
|
||||
glog.SetToStderr(true)
|
||||
log.Root().SetHandler(log.LvlFilterHandler(log.LvlCrit, log.StreamHandler(os.Stderr, log.TerminalFormat())))
|
||||
}
|
||||
|
||||
type testSyncDb struct {
|
||||
@ -83,7 +82,7 @@ func (self *testSyncDb) push(n int) {
|
||||
self.sent = append(self.sent, self.c)
|
||||
self.c++
|
||||
}
|
||||
glog.V(logger.Debug).Infof("pushed %v requests", n)
|
||||
log.Debug(fmt.Sprintf("pushed %v requests", n))
|
||||
}
|
||||
|
||||
func (self *testSyncDb) draindb() {
|
||||
@ -128,7 +127,7 @@ func (self *testSyncDb) expect(n int, db bool) {
|
||||
}
|
||||
if len(self.sent) > self.at && !bytes.Equal(crypto.Keccak256([]byte{byte(self.sent[self.at])}), self.delivered[self.at]) {
|
||||
self.t.Fatalf("expected delivery %v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
|
||||
glog.V(logger.Debug).Infof("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db)
|
||||
log.Debug(fmt.Sprintf("%v/%v/%v to be hash of %v, from db: %v = %v", i, n, self.at, self.sent[self.at], ok, db))
|
||||
}
|
||||
if !ok && db {
|
||||
self.t.Fatalf("expected delivery %v/%v/%v from db", i, n, self.at)
|
||||
|
@ -22,8 +22,7 @@ import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
)
|
||||
|
||||
@ -209,7 +208,7 @@ func newSyncer(
|
||||
// initialise a syncdb instance for each priority queue
|
||||
self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
|
||||
}
|
||||
glog.V(logger.Info).Infof("syncer started: %v", state)
|
||||
log.Info(fmt.Sprintf("syncer started: %v", state))
|
||||
// launch chunk delivery service
|
||||
go self.syncDeliveries()
|
||||
// launch sync task manager
|
||||
@ -270,14 +269,14 @@ func (self *syncer) sync() {
|
||||
|
||||
// 0. first replay stale requests from request db
|
||||
if state.SessionAt == 0 {
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: nothing to sync", self.key.Log())
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log()))
|
||||
return
|
||||
}
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: start replaying stale requests from request db", self.key.Log())
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log()))
|
||||
for p := priorities - 1; p >= 0; p-- {
|
||||
self.queues[p].dbRead(false, 0, self.replay())
|
||||
}
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: done replaying stale requests from request db", self.key.Log())
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log()))
|
||||
|
||||
// unless peer is synced sync unfinished history beginning on
|
||||
if !state.Synced {
|
||||
@ -286,7 +285,7 @@ func (self *syncer) sync() {
|
||||
if !storage.IsZeroKey(state.Latest) {
|
||||
// 1. there is unfinished earlier sync
|
||||
state.Start = state.Latest
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state))
|
||||
// blocks while the entire history upto state is synced
|
||||
self.syncState(state)
|
||||
if state.Last < state.SessionAt {
|
||||
@ -298,7 +297,7 @@ func (self *syncer) sync() {
|
||||
// 2. sync up to last disconnect1
|
||||
if state.First < state.LastSeenAt {
|
||||
state.Last = state.LastSeenAt
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state))
|
||||
self.syncState(state)
|
||||
state.First = state.LastSeenAt
|
||||
}
|
||||
@ -313,11 +312,11 @@ func (self *syncer) sync() {
|
||||
// if there have been new chunks since last session
|
||||
if state.LastSeenAt < state.SessionAt {
|
||||
state.Last = state.SessionAt
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state))
|
||||
// blocks until state syncing is finished
|
||||
self.syncState(state)
|
||||
}
|
||||
glog.V(logger.Info).Infof("syncer[%v]: syncing all history complete", self.key.Log())
|
||||
log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log()))
|
||||
|
||||
}
|
||||
|
||||
@ -333,7 +332,7 @@ func (self *syncer) syncState(state *syncState) {
|
||||
// stop quits both request processor and saves the request cache to disk
|
||||
func (self *syncer) stop() {
|
||||
close(self.quit)
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: stop and save sync request db backlog", self.key.Log())
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log()))
|
||||
for _, db := range self.queues {
|
||||
db.stop()
|
||||
}
|
||||
@ -366,7 +365,7 @@ func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error)
|
||||
func (self *syncer) syncHistory(state *syncState) chan interface{} {
|
||||
var n uint
|
||||
history := make(chan interface{})
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop))
|
||||
it := self.dbAccess.iterator(state)
|
||||
if it != nil {
|
||||
go func() {
|
||||
@ -382,13 +381,13 @@ func (self *syncer) syncHistory(state *syncState) chan interface{} {
|
||||
// blocking until history channel is read from
|
||||
case history <- storage.Key(key):
|
||||
n++
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n))
|
||||
state.Latest = key
|
||||
case <-self.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n))
|
||||
}()
|
||||
}
|
||||
return history
|
||||
@ -438,14 +437,14 @@ LOOP:
|
||||
for priority = High; priority >= 0; priority-- {
|
||||
// the first priority channel that is non-empty will be assigned to keys
|
||||
if len(self.keys[priority]) > 0 {
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: reading request with priority %v", self.key.Log(), priority)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: reading request with priority %v", self.key.Log(), priority))
|
||||
keys = self.keys[priority]
|
||||
break PRIORITIES
|
||||
}
|
||||
glog.V(logger.Detail).Infof("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low]))
|
||||
log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])))
|
||||
// if the input queue is empty on this level, resort to history if there is any
|
||||
if uint(priority) == histPrior && history != nil {
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: reading history for %v", self.key.Log(), self.key)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key))
|
||||
keys = history
|
||||
break PRIORITIES
|
||||
}
|
||||
@ -455,7 +454,7 @@ LOOP:
|
||||
// if peer ready to receive but nothing to send
|
||||
if keys == nil && deliveryRequest == nil {
|
||||
// if no items left and switch to waiting mode
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: buffers consumed. Waiting", self.key.Log())
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log()))
|
||||
newUnsyncedKeys = self.newUnsyncedKeys
|
||||
}
|
||||
|
||||
@ -476,15 +475,15 @@ LOOP:
|
||||
// (all nonhistorical outgoing traffic sheduled and persisted
|
||||
state.LastSeenAt = self.dbAccess.counter()
|
||||
state.Latest = storage.ZeroKey
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: sending %v", self.key.Log(), unsynced)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced))
|
||||
// send the unsynced keys
|
||||
stateCopy := *state
|
||||
err := self.unsyncedKeys(unsynced, &stateCopy)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("syncer[%v]: unable to send unsynced keys: %v", err)
|
||||
log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", err))
|
||||
}
|
||||
self.state = state
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy)
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy))
|
||||
unsynced = nil
|
||||
keys = nil
|
||||
}
|
||||
@ -495,7 +494,7 @@ LOOP:
|
||||
break LOOP
|
||||
case req, more = <-keys:
|
||||
if keys == history && !more {
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: syncing history segment complete", self.key.Log())
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log()))
|
||||
// history channel is closed, waiting for new state (called from sync())
|
||||
syncStates = self.syncStates
|
||||
state.Synced = true // this signals that the current segment is complete
|
||||
@ -508,7 +507,7 @@ LOOP:
|
||||
history = nil
|
||||
}
|
||||
case <-deliveryRequest:
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: peer ready to receive", self.key.Log())
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log()))
|
||||
|
||||
// this 1 cap channel can wake up the loop
|
||||
// signaling that peer is ready to receive unsynced Keys
|
||||
@ -516,7 +515,7 @@ LOOP:
|
||||
deliveryRequest = nil
|
||||
|
||||
case <-newUnsyncedKeys:
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: new unsynced keys available", self.key.Log())
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log()))
|
||||
// this 1 cap channel can wake up the loop
|
||||
// signals that data is available to send if peer is ready to receive
|
||||
newUnsyncedKeys = nil
|
||||
@ -526,11 +525,11 @@ LOOP:
|
||||
// this resets the state
|
||||
if !more {
|
||||
state = self.state
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state))
|
||||
state.Synced = true
|
||||
syncStates = nil
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior))
|
||||
state.Synced = false
|
||||
history = self.syncHistory(state)
|
||||
// only one history at a time, only allow another one once the
|
||||
@ -542,19 +541,19 @@ LOOP:
|
||||
continue LOOP
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req))
|
||||
keyCounts[priority]++
|
||||
keyCount++
|
||||
if keys == history {
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
|
||||
historyCnt++
|
||||
}
|
||||
if sreq, err := self.newSyncRequest(req, priority); err == nil {
|
||||
// extract key from req
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
|
||||
unsynced = append(unsynced, sreq)
|
||||
} else {
|
||||
glog.V(logger.Warn).Infof("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err)
|
||||
log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, state.Synced, err))
|
||||
}
|
||||
|
||||
}
|
||||
@ -601,18 +600,18 @@ func (self *syncer) syncDeliveries() {
|
||||
total++
|
||||
msg, err = self.newStoreRequestMsgData(req)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err)
|
||||
log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err))
|
||||
} else {
|
||||
err = self.store(msg)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err)
|
||||
log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err))
|
||||
} else {
|
||||
success++
|
||||
glog.V(logger.Detail).Infof("syncer[%v]: %v successfully delivered", self.key.Log(), req)
|
||||
log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req))
|
||||
}
|
||||
}
|
||||
if total%self.SyncBatchSize == 0 {
|
||||
glog.V(logger.Debug).Infof("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low])
|
||||
log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -679,7 +678,7 @@ func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool)
|
||||
func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
|
||||
msgdata, err := self.newStoreRequestMsgData(req)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("unable to deliver request %v: %v", msgdata, err)
|
||||
log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err))
|
||||
return false
|
||||
}
|
||||
select {
|
||||
|
@ -31,8 +31,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/contracts/chequebook/contract"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/services/swap/swap"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
@ -132,19 +131,19 @@ func NewSwap(local *SwapParams, remote *SwapProfile, backend chequebook.Backend,
|
||||
// TODO: monitoring a chequebooks events
|
||||
ok, err = chequebook.ValidateCode(ctx, backend, remote.Contract)
|
||||
if !ok {
|
||||
glog.V(logger.Info).Infof("invalid contract %v for peer %v: %v)", remote.Contract.Hex()[:8], proto, err)
|
||||
log.Info(fmt.Sprintf("invalid contract %v for peer %v: %v)", remote.Contract.Hex()[:8], proto, err))
|
||||
} else {
|
||||
// remote contract valid, create inbox
|
||||
in, err = chequebook.NewInbox(local.privateKey, remote.Contract, local.Beneficiary, crypto.ToECDSAPub(common.FromHex(remote.PublicKey)), backend)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("unable to set up inbox for chequebook contract %v for peer %v: %v)", remote.Contract.Hex()[:8], proto, err)
|
||||
log.Warn(fmt.Sprintf("unable to set up inbox for chequebook contract %v for peer %v: %v)", remote.Contract.Hex()[:8], proto, err))
|
||||
}
|
||||
}
|
||||
|
||||
// check if local chequebook contract is valid
|
||||
ok, err = chequebook.ValidateCode(ctx, backend, local.Contract)
|
||||
if !ok {
|
||||
glog.V(logger.Warn).Infof("unable to set up outbox for peer %v: chequebook contract (owner: %v): %v)", proto, local.owner.Hex(), err)
|
||||
log.Warn(fmt.Sprintf("unable to set up outbox for peer %v: chequebook contract (owner: %v): %v)", proto, local.owner.Hex(), err))
|
||||
} else {
|
||||
out = chequebook.NewOutbox(local.Chequebook(), remote.Beneficiary)
|
||||
}
|
||||
@ -172,7 +171,7 @@ func NewSwap(local *SwapParams, remote *SwapProfile, backend chequebook.Backend,
|
||||
} else {
|
||||
sell = "selling to peer disabled"
|
||||
}
|
||||
glog.V(logger.Warn).Infof("SWAP arrangement with <%v>: %v; %v)", proto, buy, sell)
|
||||
log.Warn(fmt.Sprintf("SWAP arrangement with <%v>: %v; %v)", proto, buy, sell))
|
||||
|
||||
return
|
||||
}
|
||||
@ -217,13 +216,13 @@ func (self *SwapParams) deployChequebook(ctx context.Context, backend chequebook
|
||||
opts.Value = self.AutoDepositBuffer
|
||||
opts.Context = ctx
|
||||
|
||||
glog.V(logger.Info).Infof("Deploying new chequebook (owner: %v)", opts.From.Hex())
|
||||
log.Info(fmt.Sprintf("Deploying new chequebook (owner: %v)", opts.From.Hex()))
|
||||
contract, err := deployChequebookLoop(opts, backend)
|
||||
if err != nil {
|
||||
glog.V(logger.Error).Infof("unable to deploy new chequebook: %v", err)
|
||||
log.Error(fmt.Sprintf("unable to deploy new chequebook: %v", err))
|
||||
return err
|
||||
}
|
||||
glog.V(logger.Info).Infof("new chequebook deployed at %v (owner: %v)", contract.Hex(), opts.From.Hex())
|
||||
log.Info(fmt.Sprintf("new chequebook deployed at %v (owner: %v)", contract.Hex(), opts.From.Hex()))
|
||||
|
||||
// need to save config at this point
|
||||
self.lock.Lock()
|
||||
@ -231,7 +230,7 @@ func (self *SwapParams) deployChequebook(ctx context.Context, backend chequebook
|
||||
err = self.newChequebookFromContract(path, backend)
|
||||
self.lock.Unlock()
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("error initialising cheque book (owner: %v): %v", opts.From.Hex(), err)
|
||||
log.Warn(fmt.Sprintf("error initialising cheque book (owner: %v): %v", opts.From.Hex(), err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -244,11 +243,11 @@ func deployChequebookLoop(opts *bind.TransactOpts, backend chequebook.Backend) (
|
||||
time.Sleep(chequebookDeployDelay)
|
||||
}
|
||||
if _, tx, _, err = contract.DeployChequebook(opts, backend); err != nil {
|
||||
glog.V(logger.Warn).Infof("can't send chequebook deploy tx (try %d): %v", try, err)
|
||||
log.Warn(fmt.Sprintf("can't send chequebook deploy tx (try %d): %v", try, err))
|
||||
continue
|
||||
}
|
||||
if addr, err = bind.WaitDeployed(opts.Context, backend, tx); err != nil {
|
||||
glog.V(logger.Warn).Infof("chequebook deploy error (try %d): %v", try, err)
|
||||
log.Warn(fmt.Sprintf("chequebook deploy error (try %d): %v", try, err))
|
||||
continue
|
||||
}
|
||||
return addr, nil
|
||||
@ -271,13 +270,13 @@ func (self *SwapParams) newChequebookFromContract(path string, backend chequeboo
|
||||
if err != nil {
|
||||
self.chbook, err = chequebook.NewChequebook(chbookpath, self.Contract, self.privateKey, backend)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("unable to initialise chequebook (owner: %v): %v", self.owner.Hex(), err)
|
||||
log.Warn(fmt.Sprintf("unable to initialise chequebook (owner: %v): %v", self.owner.Hex(), err))
|
||||
return fmt.Errorf("unable to initialise chequebook (owner: %v): %v", self.owner.Hex(), err)
|
||||
}
|
||||
}
|
||||
|
||||
self.chbook.AutoDeposit(self.AutoDepositInterval, self.AutoDepositThreshold, self.AutoDepositBuffer)
|
||||
glog.V(logger.Info).Infof("auto deposit ON for %v -> %v: interval = %v, threshold = %v, buffer = %v)", crypto.PubkeyToAddress(*(self.publicKey)).Hex()[:8], self.Contract.Hex()[:8], self.AutoDepositInterval, self.AutoDepositThreshold, self.AutoDepositBuffer)
|
||||
log.Info(fmt.Sprintf("auto deposit ON for %v -> %v: interval = %v, threshold = %v, buffer = %v)", crypto.PubkeyToAddress(*(self.publicKey)).Hex()[:8], self.Contract.Hex()[:8], self.AutoDepositInterval, self.AutoDepositThreshold, self.AutoDepositBuffer))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -23,8 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// SwAP Swarm Accounting Protocol with
|
||||
@ -130,7 +129,7 @@ func (self *Swap) SetRemote(remote *Profile) {
|
||||
self.Buys = false
|
||||
}
|
||||
|
||||
glog.V(logger.Debug).Infof("<%v> remote profile set: pay at: %v, drop at: %v, buy at: %v, sell at: %v", self.proto, remote.PayAt, remote.DropAt, remote.BuyAt, remote.SellAt)
|
||||
log.Debug(fmt.Sprintf("<%v> remote profile set: pay at: %v, drop at: %v, buy at: %v, sell at: %v", self.proto, remote.PayAt, remote.DropAt, remote.BuyAt, remote.SellAt))
|
||||
|
||||
}
|
||||
|
||||
@ -148,15 +147,15 @@ func (self *Swap) setParams(local *Params) {
|
||||
|
||||
if self.Sells {
|
||||
self.In.AutoCash(local.AutoCashInterval, local.AutoCashThreshold)
|
||||
glog.V(logger.Info).Infof("<%v> set autocash to every %v, max uncashed limit: %v", self.proto, local.AutoCashInterval, local.AutoCashThreshold)
|
||||
log.Info(fmt.Sprintf("<%v> set autocash to every %v, max uncashed limit: %v", self.proto, local.AutoCashInterval, local.AutoCashThreshold))
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("<%v> autocash off (not selling)", self.proto)
|
||||
log.Info(fmt.Sprintf("<%v> autocash off (not selling)", self.proto))
|
||||
}
|
||||
if self.Buys {
|
||||
self.Out.AutoDeposit(local.AutoDepositInterval, local.AutoDepositThreshold, local.AutoDepositBuffer)
|
||||
glog.V(logger.Info).Infof("<%v> set autodeposit to every %v, pay at: %v, buffer: %v", self.proto, local.AutoDepositInterval, local.AutoDepositThreshold, local.AutoDepositBuffer)
|
||||
log.Info(fmt.Sprintf("<%v> set autodeposit to every %v, pay at: %v, buffer: %v", self.proto, local.AutoDepositInterval, local.AutoDepositThreshold, local.AutoDepositBuffer))
|
||||
} else {
|
||||
glog.V(logger.Info).Infof("<%v> autodeposit off (not buying)", self.proto)
|
||||
log.Info(fmt.Sprintf("<%v> autodeposit off (not buying)", self.proto))
|
||||
}
|
||||
}
|
||||
|
||||
@ -168,16 +167,16 @@ func (self *Swap) Add(n int) error {
|
||||
self.lock.Lock()
|
||||
self.balance += n
|
||||
if !self.Sells && self.balance > 0 {
|
||||
glog.V(logger.Detail).Infof("<%v> remote peer cannot have debt (balance: %v)", self.proto, self.balance)
|
||||
log.Trace(fmt.Sprintf("<%v> remote peer cannot have debt (balance: %v)", self.proto, self.balance))
|
||||
self.proto.Drop()
|
||||
return fmt.Errorf("[SWAP] <%v> remote peer cannot have debt (balance: %v)", self.proto, self.balance)
|
||||
}
|
||||
if !self.Buys && self.balance < 0 {
|
||||
glog.V(logger.Detail).Infof("<%v> we cannot have debt (balance: %v)", self.proto, self.balance)
|
||||
log.Trace(fmt.Sprintf("<%v> we cannot have debt (balance: %v)", self.proto, self.balance))
|
||||
return fmt.Errorf("[SWAP] <%v> we cannot have debt (balance: %v)", self.proto, self.balance)
|
||||
}
|
||||
if self.balance >= int(self.local.DropAt) {
|
||||
glog.V(logger.Detail).Infof("<%v> remote peer has too much debt (balance: %v, disconnect threshold: %v)", self.proto, self.balance, self.local.DropAt)
|
||||
log.Trace(fmt.Sprintf("<%v> remote peer has too much debt (balance: %v, disconnect threshold: %v)", self.proto, self.balance, self.local.DropAt))
|
||||
self.proto.Drop()
|
||||
return fmt.Errorf("[SWAP] <%v> remote peer has too much debt (balance: %v, disconnect threshold: %v)", self.proto, self.balance, self.local.DropAt)
|
||||
} else if self.balance <= -int(self.remote.PayAt) {
|
||||
@ -201,9 +200,9 @@ func (self *Swap) send() {
|
||||
amount.Mul(amount, self.remote.SellAt)
|
||||
promise, err := self.Out.Issue(amount)
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("<%v> cannot issue cheque (amount: %v, channel: %v): %v", self.proto, amount, self.Out, err)
|
||||
log.Warn(fmt.Sprintf("<%v> cannot issue cheque (amount: %v, channel: %v): %v", self.proto, amount, self.Out, err))
|
||||
} else {
|
||||
glog.V(logger.Warn).Infof("<%v> cheque issued (amount: %v, channel: %v)", self.proto, amount, self.Out)
|
||||
log.Warn(fmt.Sprintf("<%v> cheque issued (amount: %v, channel: %v)", self.proto, amount, self.Out))
|
||||
self.proto.Pay(-self.balance, promise)
|
||||
self.balance = 0
|
||||
}
|
||||
@ -229,13 +228,13 @@ func (self *Swap) Receive(units int, promise Promise) error {
|
||||
return fmt.Errorf("invalid amount: %v = %v * %v (units sent in msg * agreed sale unit price) != %v (signed in cheque)", price, units, self.local.SellAt, amount)
|
||||
}
|
||||
if err != nil {
|
||||
glog.V(logger.Detail).Infof("<%v> invalid promise (amount: %v, channel: %v): %v", self.proto, amount, self.In, err)
|
||||
log.Trace(fmt.Sprintf("<%v> invalid promise (amount: %v, channel: %v): %v", self.proto, amount, self.In, err))
|
||||
return err
|
||||
}
|
||||
|
||||
// credit remote peer with units
|
||||
self.Add(-units)
|
||||
glog.V(logger.Detail).Infof("<%v> received promise (amount: %v, channel: %v): %v", self.proto, amount, self.In, promise)
|
||||
log.Trace(fmt.Sprintf("<%v> received promise (amount: %v, channel: %v): %v", self.proto, amount, self.In, promise))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -24,8 +24,7 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
type brokenLimitedReader struct {
|
||||
@ -92,14 +91,14 @@ func testStore(m ChunkStore, l int64, branches int64, t *testing.T) {
|
||||
go func(chunk *Chunk) {
|
||||
storedChunk, err := m.Get(chunk.Key)
|
||||
if err == notFound {
|
||||
glog.V(logger.Detail).Infof("chunk '%v' not found", chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log()))
|
||||
} else if err != nil {
|
||||
glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
|
||||
log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err))
|
||||
} else {
|
||||
chunk.SData = storedChunk.SData
|
||||
chunk.Size = storedChunk.Size
|
||||
}
|
||||
glog.V(logger.Detail).Infof("chunk '%v' not found", chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("chunk '%v' not found", chunk.Key.Log()))
|
||||
close(chunk.C)
|
||||
}(ch)
|
||||
}
|
||||
|
@ -28,8 +28,7 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
@ -279,7 +278,7 @@ func (s *DbStore) Cleanup() {
|
||||
|
||||
data, err := s.db.Get(getDataKey(index.Idx))
|
||||
if err != nil {
|
||||
glog.V(logger.Warn).Infof("Chunk %x found but could not be accessed: %v", key[:], err)
|
||||
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err))
|
||||
s.delete(index.Idx, getIndexKey(key[1:]))
|
||||
errorsFound++
|
||||
} else {
|
||||
@ -287,7 +286,7 @@ func (s *DbStore) Cleanup() {
|
||||
hasher.Write(data)
|
||||
hash := hasher.Sum(nil)
|
||||
if !bytes.Equal(hash, key[1:]) {
|
||||
glog.V(logger.Warn).Infof("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])
|
||||
log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:]))
|
||||
s.delete(index.Idx, getIndexKey(key[1:]))
|
||||
errorsFound++
|
||||
}
|
||||
@ -295,7 +294,7 @@ func (s *DbStore) Cleanup() {
|
||||
it.Next()
|
||||
}
|
||||
it.Release()
|
||||
glog.V(logger.Warn).Infof("Found %v errors out of %v entries", errorsFound, total)
|
||||
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
|
||||
}
|
||||
|
||||
func (s *DbStore) delete(idx uint64, idxKey []byte) {
|
||||
@ -324,7 +323,7 @@ func (s *DbStore) Put(chunk *Chunk) {
|
||||
if chunk.dbStored != nil {
|
||||
close(chunk.dbStored)
|
||||
}
|
||||
glog.V(logger.Detail).Infof("Storing to DB: chunk already exists, only update access")
|
||||
log.Trace(fmt.Sprintf("Storing to DB: chunk already exists, only update access"))
|
||||
return // already exists, only update access
|
||||
}
|
||||
|
||||
@ -356,7 +355,7 @@ func (s *DbStore) Put(chunk *Chunk) {
|
||||
if chunk.dbStored != nil {
|
||||
close(chunk.dbStored)
|
||||
}
|
||||
glog.V(logger.Detail).Infof("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx)
|
||||
log.Trace(fmt.Sprintf("DbStore.Put: %v. db storage counter: %v ", chunk.Key.Log(), s.dataIdx))
|
||||
}
|
||||
|
||||
// try to find index; if found, update access cnt and return true
|
||||
@ -390,7 +389,7 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
|
||||
var data []byte
|
||||
data, err = s.db.Get(getDataKey(index.Idx))
|
||||
if err != nil {
|
||||
glog.V(logger.Detail).Infof("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err)
|
||||
log.Trace(fmt.Sprintf("DBStore: Chunk %v found but could not be accessed: %v", key.Log(), err))
|
||||
s.delete(index.Idx, getIndexKey(key))
|
||||
return
|
||||
}
|
||||
|
@ -18,12 +18,12 @@ package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -131,17 +131,17 @@ func (self *DPA) retrieveLoop() {
|
||||
for i := 0; i < maxRetrieveProcesses; i++ {
|
||||
go self.retrieveWorker()
|
||||
}
|
||||
glog.V(logger.Detail).Infof("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses)
|
||||
log.Trace(fmt.Sprintf("dpa: retrieve loop spawning %v workers", maxRetrieveProcesses))
|
||||
}
|
||||
|
||||
func (self *DPA) retrieveWorker() {
|
||||
for chunk := range self.retrieveC {
|
||||
glog.V(logger.Detail).Infof("dpa: retrieve loop : chunk %v", chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("dpa: retrieve loop : chunk %v", chunk.Key.Log()))
|
||||
storedChunk, err := self.Get(chunk.Key)
|
||||
if err == notFound {
|
||||
glog.V(logger.Detail).Infof("chunk %v not found", chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("chunk %v not found", chunk.Key.Log()))
|
||||
} else if err != nil {
|
||||
glog.V(logger.Detail).Infof("error retrieving chunk %v: %v", chunk.Key.Log(), err)
|
||||
log.Trace(fmt.Sprintf("error retrieving chunk %v: %v", chunk.Key.Log(), err))
|
||||
} else {
|
||||
chunk.SData = storedChunk.SData
|
||||
chunk.Size = storedChunk.Size
|
||||
@ -162,7 +162,7 @@ func (self *DPA) storeLoop() {
|
||||
for i := 0; i < maxStoreProcesses; i++ {
|
||||
go self.storeWorker()
|
||||
}
|
||||
glog.V(logger.Detail).Infof("dpa: store spawning %v workers", maxStoreProcesses)
|
||||
log.Trace(fmt.Sprintf("dpa: store spawning %v workers", maxStoreProcesses))
|
||||
}
|
||||
|
||||
func (self *DPA) storeWorker() {
|
||||
@ -170,7 +170,7 @@ func (self *DPA) storeWorker() {
|
||||
for chunk := range self.storeC {
|
||||
self.Put(chunk)
|
||||
if chunk.wg != nil {
|
||||
glog.V(logger.Detail).Infof("dpa: store processor %v", chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("dpa: store processor %v", chunk.Key.Log()))
|
||||
chunk.wg.Done()
|
||||
|
||||
}
|
||||
@ -203,17 +203,17 @@ func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
|
||||
chunk, err = self.netStore.Get(key)
|
||||
// timeout := time.Now().Add(searchTimeout)
|
||||
if chunk.SData != nil {
|
||||
glog.V(logger.Detail).Infof("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData))
|
||||
log.Trace(fmt.Sprintf("DPA.Get: %v found locally, %d bytes", key.Log(), len(chunk.SData)))
|
||||
return
|
||||
}
|
||||
// TODO: use self.timer time.Timer and reset with defer disableTimer
|
||||
timer := time.After(searchTimeout)
|
||||
select {
|
||||
case <-timer:
|
||||
glog.V(logger.Detail).Infof("DPA.Get: %v request time out ", key.Log())
|
||||
log.Trace(fmt.Sprintf("DPA.Get: %v request time out ", key.Log()))
|
||||
err = notFound
|
||||
case <-chunk.Req.C:
|
||||
glog.V(logger.Detail).Infof("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk)
|
||||
log.Trace(fmt.Sprintf("DPA.Get: %v retrieved, %d bytes (%p)", key.Log(), len(chunk.SData), chunk))
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -222,18 +222,18 @@ func (self *dpaChunkStore) Get(key Key) (chunk *Chunk, err error) {
|
||||
func (self *dpaChunkStore) Put(entry *Chunk) {
|
||||
chunk, err := self.localStore.Get(entry.Key)
|
||||
if err != nil {
|
||||
glog.V(logger.Detail).Infof("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("DPA.Put: %v new chunk. call netStore.Put", entry.Key.Log()))
|
||||
chunk = entry
|
||||
} else if chunk.SData == nil {
|
||||
glog.V(logger.Detail).Infof("DPA.Put: %v request entry found", entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("DPA.Put: %v request entry found", entry.Key.Log()))
|
||||
chunk.SData = entry.SData
|
||||
chunk.Size = entry.Size
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("DPA.Put: %v chunk already known", entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("DPA.Put: %v chunk already known", entry.Key.Log()))
|
||||
return
|
||||
}
|
||||
// from this point on the storage logic is the same with network storage requests
|
||||
glog.V(logger.Detail).Infof("DPA.Put %v: %v", self.n, chunk.Key.Log())
|
||||
log.Trace(fmt.Sprintf("DPA.Put %v: %v", self.n, chunk.Key.Log()))
|
||||
self.n++
|
||||
self.netStore.Put(chunk)
|
||||
}
|
||||
|
@ -19,10 +19,10 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -287,11 +287,11 @@ func (s *MemStore) removeOldest() {
|
||||
}
|
||||
|
||||
if node.entry.dbStored != nil {
|
||||
glog.V(logger.Detail).Infof("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log()))
|
||||
<-node.entry.dbStored
|
||||
glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log()))
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v already in DB. Ready to delete.", node.entry.Key.Log()))
|
||||
}
|
||||
|
||||
if node.entry.SData != nil {
|
||||
|
@ -17,12 +17,12 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -98,14 +98,14 @@ func (self *NetStore) Put(entry *Chunk) {
|
||||
|
||||
// handle deliveries
|
||||
if entry.Req != nil {
|
||||
glog.V(logger.Detail).Infof("NetStore.Put: localStore.Put %v hit existing request...delivering", entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v hit existing request...delivering", entry.Key.Log()))
|
||||
// closing C signals to other routines (local requests)
|
||||
// that the chunk is has been retrieved
|
||||
close(entry.Req.C)
|
||||
// deliver the chunk to requesters upstream
|
||||
go self.cloud.Deliver(entry)
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("NetStore.Put: localStore.Put %v stored locally", entry.Key.Log())
|
||||
log.Trace(fmt.Sprintf("NetStore.Put: localStore.Put %v stored locally", entry.Key.Log()))
|
||||
// handle propagating store requests
|
||||
// go self.cloud.Store(entry)
|
||||
go self.cloud.Store(entry)
|
||||
@ -118,15 +118,15 @@ func (self *NetStore) Get(key Key) (*Chunk, error) {
|
||||
chunk, err := self.localStore.Get(key)
|
||||
if err == nil {
|
||||
if chunk.Req == nil {
|
||||
glog.V(logger.Detail).Infof("NetStore.Get: %v found locally", key)
|
||||
log.Trace(fmt.Sprintf("NetStore.Get: %v found locally", key))
|
||||
} else {
|
||||
glog.V(logger.Detail).Infof("NetStore.Get: %v hit on an existing request", key)
|
||||
log.Trace(fmt.Sprintf("NetStore.Get: %v hit on an existing request", key))
|
||||
// no need to launch again
|
||||
}
|
||||
return chunk, err
|
||||
}
|
||||
// no data and no request status
|
||||
glog.V(logger.Detail).Infof("NetStore.Get: %v not found locally. open new request", key)
|
||||
log.Trace(fmt.Sprintf("NetStore.Get: %v not found locally. open new request", key))
|
||||
chunk = NewChunk(key, newRequestStatus(key))
|
||||
self.localStore.memStore.Put(chunk)
|
||||
go self.cloud.Retrieve(chunk)
|
||||
|
@ -26,8 +26,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/contracts/chequebook"
|
||||
"github.com/ethereum/go-ethereum/contracts/ens"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||
@ -88,7 +87,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
|
||||
privateKey: config.Swap.PrivateKey(),
|
||||
corsString: cors,
|
||||
}
|
||||
glog.V(logger.Debug).Infof("Setting up Swarm service components")
|
||||
log.Debug(fmt.Sprintf("Setting up Swarm service components"))
|
||||
|
||||
hash := storage.MakeHashFunc(config.ChunkerParams.Hash)
|
||||
self.lstore, err = storage.NewLocalStore(hash, config.StoreParams)
|
||||
@ -97,10 +96,10 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
|
||||
}
|
||||
|
||||
// setup local store
|
||||
glog.V(logger.Debug).Infof("Set up local storage")
|
||||
log.Debug(fmt.Sprintf("Set up local storage"))
|
||||
|
||||
self.dbAccess = network.NewDbAccess(self.lstore)
|
||||
glog.V(logger.Debug).Infof("Set up local db access (iterator/counter)")
|
||||
log.Debug(fmt.Sprintf("Set up local db access (iterator/counter)"))
|
||||
|
||||
// set up the kademlia hive
|
||||
self.hive = network.NewHive(
|
||||
@ -109,26 +108,26 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
|
||||
swapEnabled, // SWAP enabled
|
||||
syncEnabled, // syncronisation enabled
|
||||
)
|
||||
glog.V(logger.Debug).Infof("Set up swarm network with Kademlia hive")
|
||||
log.Debug(fmt.Sprintf("Set up swarm network with Kademlia hive"))
|
||||
|
||||
// setup cloud storage backend
|
||||
cloud := network.NewForwarder(self.hive)
|
||||
glog.V(logger.Debug).Infof("-> set swarm forwarder as cloud storage backend")
|
||||
log.Debug(fmt.Sprintf("-> set swarm forwarder as cloud storage backend"))
|
||||
// setup cloud storage internal access layer
|
||||
|
||||
self.storage = storage.NewNetStore(hash, self.lstore, cloud, config.StoreParams)
|
||||
glog.V(logger.Debug).Infof("-> swarm net store shared access layer to Swarm Chunk Store")
|
||||
log.Debug(fmt.Sprintf("-> swarm net store shared access layer to Swarm Chunk Store"))
|
||||
|
||||
// set up Depo (storage handler = cloud storage access layer for incoming remote requests)
|
||||
self.depo = network.NewDepo(hash, self.lstore, self.storage)
|
||||
glog.V(logger.Debug).Infof("-> REmote Access to CHunks")
|
||||
log.Debug(fmt.Sprintf("-> REmote Access to CHunks"))
|
||||
|
||||
// set up DPA, the cloud storage local access layer
|
||||
dpaChunkStore := storage.NewDpaChunkStore(self.lstore, self.storage)
|
||||
glog.V(logger.Debug).Infof("-> Local Access to Swarm")
|
||||
log.Debug(fmt.Sprintf("-> Local Access to Swarm"))
|
||||
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
|
||||
self.dpa = storage.NewDPA(dpaChunkStore, self.config.ChunkerParams)
|
||||
glog.V(logger.Debug).Infof("-> Content Store API")
|
||||
log.Debug(fmt.Sprintf("-> Content Store API"))
|
||||
|
||||
// set up high level api
|
||||
transactOpts := bind.NewKeyedTransactor(self.privateKey)
|
||||
@ -137,11 +136,11 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(logger.Debug).Infof("-> Swarm Domain Name Registrar @ address %v", config.EnsRoot.Hex())
|
||||
log.Debug(fmt.Sprintf("-> Swarm Domain Name Registrar @ address %v", config.EnsRoot.Hex()))
|
||||
|
||||
self.api = api.NewApi(self.dpa, self.dns)
|
||||
// Manifests for Smart Hosting
|
||||
glog.V(logger.Debug).Infof("-> Web3 virtual server API")
|
||||
log.Debug(fmt.Sprintf("-> Web3 virtual server API"))
|
||||
|
||||
return self, nil
|
||||
}
|
||||
@ -173,21 +172,21 @@ func (self *Swarm) Start(net *p2p.Server) error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to set chequebook for SWAP: %v", err)
|
||||
}
|
||||
glog.V(logger.Debug).Infof("-> cheque book for SWAP: %v", self.config.Swap.Chequebook())
|
||||
log.Debug(fmt.Sprintf("-> cheque book for SWAP: %v", self.config.Swap.Chequebook()))
|
||||
} else {
|
||||
glog.V(logger.Debug).Infof("SWAP disabled: no cheque book set")
|
||||
log.Debug(fmt.Sprintf("SWAP disabled: no cheque book set"))
|
||||
}
|
||||
|
||||
glog.V(logger.Warn).Infof("Starting Swarm service")
|
||||
log.Warn(fmt.Sprintf("Starting Swarm service"))
|
||||
self.hive.Start(
|
||||
discover.PubkeyID(&net.PrivateKey.PublicKey),
|
||||
func() string { return net.ListenAddr },
|
||||
connectPeer,
|
||||
)
|
||||
glog.V(logger.Info).Infof("Swarm network started on bzz address: %v", self.hive.Addr())
|
||||
log.Info(fmt.Sprintf("Swarm network started on bzz address: %v", self.hive.Addr()))
|
||||
|
||||
self.dpa.Start()
|
||||
glog.V(logger.Debug).Infof("Swarm DPA started")
|
||||
log.Debug(fmt.Sprintf("Swarm DPA started"))
|
||||
|
||||
// start swarm http proxy server
|
||||
if self.config.Port != "" {
|
||||
@ -195,10 +194,10 @@ func (self *Swarm) Start(net *p2p.Server) error {
|
||||
go httpapi.StartHttpServer(self.api, &httpapi.Server{Addr: addr, CorsString: self.corsString})
|
||||
}
|
||||
|
||||
glog.V(logger.Debug).Infof("Swarm http proxy started on port: %v", self.config.Port)
|
||||
log.Debug(fmt.Sprintf("Swarm http proxy started on port: %v", self.config.Port))
|
||||
|
||||
if self.corsString != "" {
|
||||
glog.V(logger.Debug).Infof("Swarm http proxy started with corsdomain:", self.corsString)
|
||||
log.Debug(fmt.Sprintf("Swarm http proxy started with corsdomain:", self.corsString))
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -279,7 +278,7 @@ func (self *Swarm) SetChequebook(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(logger.Info).Infof("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex())
|
||||
log.Info(fmt.Sprintf("new chequebook set (%v): saving config file, resetting all connections in the hive", self.config.Swap.Contract.Hex()))
|
||||
self.config.Save()
|
||||
self.hive.DropAll()
|
||||
return nil
|
||||
|
Reference in New Issue
Block a user