swarm: push tags integration - request flow
swarm/api: integrate tags to count chunks being split and stored swarm/api/http: integrate tags in middleware for HTTP `POST` calls and assert chunks being calculated and counted correctly swarm: remove deprecated and unused code, add swarm hash to DoneSplit signature, remove calls to the api client from the http package
This commit is contained in:
@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/swarm/api"
|
||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||
"github.com/ethereum/go-ethereum/swarm/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/sctx"
|
||||
"github.com/ethereum/go-ethereum/swarm/spancontext"
|
||||
@ -86,6 +87,54 @@ func InitLoggingResponseWriter(h http.Handler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// InitUploadTag creates a new tag for an upload to the local HTTP proxy
|
||||
// if a tag is not named using the SwarmTagHeaderName, a fallback name will be used
|
||||
// when the Content-Length header is set, an ETA on chunking will be available since the
|
||||
// number of chunks to be split is known in advance (not including enclosing manifest chunks)
|
||||
// the tag can later be accessed using the appropriate identifier in the request context
|
||||
func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
tagName string
|
||||
err error
|
||||
estimatedTotal int64 = 0
|
||||
contentType = r.Header.Get("Content-Type")
|
||||
headerTag = r.Header.Get(SwarmTagHeaderName)
|
||||
)
|
||||
if headerTag != "" {
|
||||
tagName = headerTag
|
||||
log.Trace("got tag name from http header", "tagName", tagName)
|
||||
} else {
|
||||
tagName = fmt.Sprintf("unnamed_tag_%d", time.Now().Unix())
|
||||
}
|
||||
|
||||
if !strings.Contains(contentType, "multipart") && r.ContentLength > 0 {
|
||||
log.Trace("calculating tag size", "contentType", contentType, "contentLength", r.ContentLength)
|
||||
uri := GetURI(r.Context())
|
||||
if uri != nil {
|
||||
log.Debug("got uri from context")
|
||||
if uri.Addr == "encrypt" {
|
||||
estimatedTotal = calculateNumberOfChunks(r.ContentLength, true)
|
||||
} else {
|
||||
estimatedTotal = calculateNumberOfChunks(r.ContentLength, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)
|
||||
|
||||
t, err := tags.New(tagName, estimatedTotal)
|
||||
if err != nil {
|
||||
log.Error("error creating tag", "err", err, "tagName", tagName)
|
||||
}
|
||||
|
||||
log.Trace("setting tag id to context", "uid", t.Uid)
|
||||
ctx := sctx.SetTag(r.Context(), t.Uid)
|
||||
|
||||
h.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
func InstrumentOpenTracing(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
uri := GetURI(r.Context())
|
||||
|
@ -79,7 +79,7 @@ func respondTemplate(w http.ResponseWriter, r *http.Request, templateName, msg s
|
||||
}
|
||||
|
||||
func respondError(w http.ResponseWriter, r *http.Request, msg string, code int) {
|
||||
log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code)
|
||||
log.Info("respondError", "ruid", GetRUID(r.Context()), "uri", GetURI(r.Context()), "code", code, "msg", msg)
|
||||
respondTemplate(w, r, "error", msg, code)
|
||||
}
|
||||
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
@ -38,7 +39,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/swarm/api"
|
||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||
"github.com/ethereum/go-ethereum/swarm/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/sctx"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage/feed"
|
||||
"github.com/rs/cors"
|
||||
@ -60,6 +63,8 @@ var (
|
||||
getListFail = metrics.NewRegisteredCounter("api.http.get.list.fail", nil)
|
||||
)
|
||||
|
||||
const SwarmTagHeaderName = "x-swarm-tag"
|
||||
|
||||
type methodHandler map[string]http.Handler
|
||||
|
||||
func (m methodHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
@ -94,6 +99,12 @@ func NewServer(api *api.API, corsString string) *Server {
|
||||
InstrumentOpenTracing,
|
||||
}
|
||||
|
||||
tagAdapter := Adapter(func(h http.Handler) http.Handler {
|
||||
return InitUploadTag(h, api.Tags)
|
||||
})
|
||||
|
||||
defaultPostMiddlewares := append(defaultMiddlewares, tagAdapter)
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/bzz:/", methodHandler{
|
||||
"GET": Adapt(
|
||||
@ -102,7 +113,7 @@ func NewServer(api *api.API, corsString string) *Server {
|
||||
),
|
||||
"POST": Adapt(
|
||||
http.HandlerFunc(server.HandlePostFiles),
|
||||
defaultMiddlewares...,
|
||||
defaultPostMiddlewares...,
|
||||
),
|
||||
"DELETE": Adapt(
|
||||
http.HandlerFunc(server.HandleDelete),
|
||||
@ -116,7 +127,7 @@ func NewServer(api *api.API, corsString string) *Server {
|
||||
),
|
||||
"POST": Adapt(
|
||||
http.HandlerFunc(server.HandlePostRaw),
|
||||
defaultMiddlewares...,
|
||||
defaultPostMiddlewares...,
|
||||
),
|
||||
})
|
||||
mux.Handle("/bzz-immutable:/", methodHandler{
|
||||
@ -230,6 +241,12 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
|
||||
ruid := GetRUID(r.Context())
|
||||
log.Debug("handle.post.raw", "ruid", ruid)
|
||||
|
||||
tagUid := sctx.GetTag(r.Context())
|
||||
tag, err := s.api.Tags.Get(tagUid)
|
||||
if err != nil {
|
||||
log.Error("handle post raw got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
|
||||
}
|
||||
|
||||
postRawCount.Inc(1)
|
||||
|
||||
toEncrypt := false
|
||||
@ -256,13 +273,16 @@ func (s *Server) HandlePostRaw(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
addr, _, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
|
||||
addr, wait, err := s.api.Store(r.Context(), r.Body, r.ContentLength, toEncrypt)
|
||||
if err != nil {
|
||||
postRawFail.Inc(1)
|
||||
respondError(w, r, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
wait(r.Context())
|
||||
tag.DoneSplit(addr)
|
||||
|
||||
log.Debug("stored content", "ruid", ruid, "key", addr)
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
@ -311,7 +331,6 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
log.Debug("new manifest", "ruid", ruid, "key", addr)
|
||||
}
|
||||
|
||||
newAddr, err := s.api.UpdateManifest(r.Context(), addr, func(mw *api.ManifestWriter) error {
|
||||
switch contentType {
|
||||
case "application/x-tar":
|
||||
@ -334,6 +353,15 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
tagUid := sctx.GetTag(r.Context())
|
||||
tag, err := s.api.Tags.Get(tagUid)
|
||||
if err != nil {
|
||||
log.Error("got an error retrieving tag for DoneSplit", "tagUid", tagUid, "err", err)
|
||||
}
|
||||
|
||||
log.Debug("done splitting, setting tag total", "SPLIT", tag.Get(chunk.StateSplit), "TOTAL", tag.Total())
|
||||
tag.DoneSplit(newAddr)
|
||||
|
||||
log.Debug("stored content", "ruid", ruid, "key", newAddr)
|
||||
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
@ -342,7 +370,7 @@ func (s *Server) HandlePostFiles(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
func (s *Server) handleTarUpload(r *http.Request, mw *api.ManifestWriter) (storage.Address, error) {
|
||||
log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()))
|
||||
log.Debug("handle.tar.upload", "ruid", GetRUID(r.Context()), "tag", sctx.GetTag(r.Context()))
|
||||
|
||||
defaultPath := r.URL.Query().Get("defaultpath")
|
||||
|
||||
@ -837,6 +865,28 @@ func (s *Server) HandleGetFile(w http.ResponseWriter, r *http.Request) {
|
||||
http.ServeContent(w, r, fileName, time.Now(), newBufferedReadSeeker(reader, getFileBufferSize))
|
||||
}
|
||||
|
||||
// calculateNumberOfChunks calculates the number of chunks in an arbitrary content length
|
||||
func calculateNumberOfChunks(contentLength int64, isEncrypted bool) int64 {
|
||||
if contentLength < 4096 {
|
||||
return 1
|
||||
}
|
||||
branchingFactor := 128
|
||||
if isEncrypted {
|
||||
branchingFactor = 64
|
||||
}
|
||||
|
||||
dataChunks := math.Ceil(float64(contentLength) / float64(4096))
|
||||
totalChunks := dataChunks
|
||||
intermediate := dataChunks / float64(branchingFactor)
|
||||
|
||||
for intermediate > 1 {
|
||||
totalChunks += math.Ceil(intermediate)
|
||||
intermediate = intermediate / float64(branchingFactor)
|
||||
}
|
||||
|
||||
return int64(totalChunks) + 1
|
||||
}
|
||||
|
||||
// The size of buffer used for bufio.Reader on LazyChunkReader passed to
|
||||
// http.ServeContent in HandleGetFile.
|
||||
// Warning: This value influences the number of chunk requests and chunker join goroutines
|
||||
|
@ -44,7 +44,6 @@ import (
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/swarm/api"
|
||||
swarm "github.com/ethereum/go-ethereum/swarm/api/client"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage/feed"
|
||||
"github.com/ethereum/go-ethereum/swarm/testutil"
|
||||
@ -755,6 +754,7 @@ func testBzzTar(encrypted bool, t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req.Header.Add("Content-Type", "application/x-tar")
|
||||
req.Header.Add(SwarmTagHeaderName, "test-upload")
|
||||
client := &http.Client{}
|
||||
resp2, err := client.Do(req)
|
||||
if err != nil {
|
||||
@ -763,6 +763,11 @@ func testBzzTar(encrypted bool, t *testing.T) {
|
||||
if resp2.StatusCode != http.StatusOK {
|
||||
t.Fatalf("err %s", resp2.Status)
|
||||
}
|
||||
|
||||
// check that the tag was written correctly
|
||||
tag := srv.Tags.All()[0]
|
||||
testutil.CheckTag(t, tag, 4, 4, 0, 4)
|
||||
|
||||
swarmHash, err := ioutil.ReadAll(resp2.Body)
|
||||
resp2.Body.Close()
|
||||
if err != nil {
|
||||
@ -834,6 +839,75 @@ func testBzzTar(encrypted bool, t *testing.T) {
|
||||
t.Fatalf("file %s did not pass content assertion", hdr.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// now check the tags endpoint
|
||||
}
|
||||
|
||||
// TestBzzCorrectTagEstimate checks that the HTTP middleware sets the total number of chunks
|
||||
// in the tag according to an estimate from the HTTP request Content-Length header divided
|
||||
// by chunk size (4096). It is needed to be checked BEFORE chunking is done, therefore
|
||||
// concurrency was introduced to slow down the HTTP request
|
||||
func TestBzzCorrectTagEstimate(t *testing.T) {
|
||||
srv := NewTestSwarmServer(t, serverFunc, nil)
|
||||
defer srv.Close()
|
||||
|
||||
for _, v := range []struct {
|
||||
toEncrypt bool
|
||||
expChunks int64
|
||||
}{
|
||||
{toEncrypt: false, expChunks: 248},
|
||||
{toEncrypt: true, expChunks: 250},
|
||||
} {
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
addr := ""
|
||||
if v.toEncrypt {
|
||||
addr = "encrypt"
|
||||
}
|
||||
req, err := http.NewRequest("POST", srv.URL+"/bzz:/"+addr, pr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
req = req.WithContext(ctx)
|
||||
req.ContentLength = 1000000
|
||||
req.Header.Add(SwarmTagHeaderName, "1000000")
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(1 * time.Millisecond):
|
||||
_, err := pw.Write([]byte{0})
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
transport := http.DefaultTransport
|
||||
_, err := transport.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}()
|
||||
done := false
|
||||
for !done {
|
||||
switch len(srv.Tags.All()) {
|
||||
case 0:
|
||||
<-time.After(10 * time.Millisecond)
|
||||
case 1:
|
||||
tag := srv.Tags.All()[0]
|
||||
testutil.CheckTag(t, tag, 0, 0, 0, v.expChunks)
|
||||
srv.Tags.Delete(tag.Uid)
|
||||
done = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestBzzRootRedirect tests that getting the root path of a manifest without
|
||||
@ -851,19 +925,11 @@ func testBzzRootRedirect(toEncrypt bool, t *testing.T) {
|
||||
defer srv.Close()
|
||||
|
||||
// create a manifest with some data at the root path
|
||||
client := swarm.NewClient(srv.URL)
|
||||
data := []byte("data")
|
||||
file := &swarm.File{
|
||||
ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
|
||||
ManifestEntry: api.ManifestEntry{
|
||||
Path: "",
|
||||
ContentType: "text/plain",
|
||||
Size: int64(len(data)),
|
||||
},
|
||||
}
|
||||
hash, err := client.Upload(file, "", toEncrypt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
headers := map[string]string{"Content-Type": "text/plain"}
|
||||
res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader(data), headers, false, t)
|
||||
if res.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK)
|
||||
}
|
||||
|
||||
// define a CheckRedirect hook which ensures there is only a single
|
||||
@ -1046,21 +1112,10 @@ func TestGet(t *testing.T) {
|
||||
func TestModify(t *testing.T) {
|
||||
srv := NewTestSwarmServer(t, serverFunc, nil)
|
||||
defer srv.Close()
|
||||
|
||||
swarmClient := swarm.NewClient(srv.URL)
|
||||
data := []byte("data")
|
||||
file := &swarm.File{
|
||||
ReadCloser: ioutil.NopCloser(bytes.NewReader(data)),
|
||||
ManifestEntry: api.ManifestEntry{
|
||||
Path: "",
|
||||
ContentType: "text/plain",
|
||||
Size: int64(len(data)),
|
||||
},
|
||||
}
|
||||
|
||||
hash, err := swarmClient.Upload(file, "", false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
headers := map[string]string{"Content-Type": "text/plain"}
|
||||
res, hash := httpDo("POST", srv.URL+"/bzz:/", bytes.NewReader([]byte("data")), headers, false, t)
|
||||
if res.StatusCode != http.StatusOK {
|
||||
t.Fatalf("unexpected status code from server %d want %d", res.StatusCode, http.StatusOK)
|
||||
}
|
||||
|
||||
for _, testCase := range []struct {
|
||||
@ -1283,6 +1338,46 @@ func TestBzzGetFileWithResolver(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestCalculateNumberOfChunks is a unit test for the chunk-number-according-to-content-length
|
||||
// calculation
|
||||
func TestCalculateNumberOfChunks(t *testing.T) {
|
||||
|
||||
//test cases:
|
||||
for _, tc := range []struct{ len, chunks int64 }{
|
||||
{len: 1000, chunks: 1},
|
||||
{len: 5000, chunks: 3},
|
||||
{len: 10000, chunks: 4},
|
||||
{len: 100000, chunks: 26},
|
||||
{len: 1000000, chunks: 248},
|
||||
{len: 325839339210, chunks: 79550620 + 621490 + 4856 + 38 + 1},
|
||||
} {
|
||||
res := calculateNumberOfChunks(tc.len, false)
|
||||
if res != tc.chunks {
|
||||
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCalculateNumberOfChunksEncrypted is a unit test for the chunk-number-according-to-content-length
|
||||
// calculation with encryption (branching factor=64)
|
||||
func TestCalculateNumberOfChunksEncrypted(t *testing.T) {
|
||||
|
||||
//test cases:
|
||||
for _, tc := range []struct{ len, chunks int64 }{
|
||||
{len: 1000, chunks: 1},
|
||||
{len: 5000, chunks: 3},
|
||||
{len: 10000, chunks: 4},
|
||||
{len: 100000, chunks: 26},
|
||||
{len: 1000000, chunks: 245 + 4 + 1},
|
||||
{len: 325839339210, chunks: 79550620 + 1242979 + 19422 + 304 + 5 + 1},
|
||||
} {
|
||||
res := calculateNumberOfChunks(tc.len, true)
|
||||
if res != tc.chunks {
|
||||
t.Fatalf("expected result for %d bytes to be %d got %d", tc.len, tc.chunks, res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// testResolver implements the Resolver interface and either returns the given
|
||||
// hash if it is set, or returns a "name not found" error
|
||||
type testResolveValidator struct {
|
||||
@ -1308,6 +1403,7 @@ func (t *testResolveValidator) Resolve(addr string) (common.Hash, error) {
|
||||
func (t *testResolveValidator) Owner(node [32]byte) (addr common.Address, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (t *testResolveValidator) HeaderByNumber(context.Context, *big.Int) (header *types.Header, err error) {
|
||||
return
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/swarm/api"
|
||||
"github.com/ethereum/go-ethereum/swarm/chunk"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage/feed"
|
||||
"github.com/ethereum/go-ethereum/swarm/storage/localstore"
|
||||
@ -44,7 +45,9 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams())
|
||||
tags := chunk.NewTags()
|
||||
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags)
|
||||
|
||||
// Swarm feeds test setup
|
||||
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")
|
||||
if err != nil {
|
||||
@ -56,12 +59,13 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API) TestServer, reso
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil)
|
||||
swarmApi := api.NewAPI(fileStore, resolver, feeds.Handler, nil, tags)
|
||||
apiServer := httptest.NewServer(serverFunc(swarmApi))
|
||||
|
||||
tss := &TestSwarmServer{
|
||||
Server: apiServer,
|
||||
FileStore: fileStore,
|
||||
Tags: tags,
|
||||
dir: swarmDir,
|
||||
Hasher: storage.MakeHashFunc(storage.DefaultHash)(),
|
||||
cleanup: func() {
|
||||
@ -81,6 +85,7 @@ type TestSwarmServer struct {
|
||||
*httptest.Server
|
||||
Hasher storage.SwarmHash
|
||||
FileStore *storage.FileStore
|
||||
Tags *chunk.Tags
|
||||
dir string
|
||||
cleanup func()
|
||||
CurrentTime uint64
|
||||
|
Reference in New Issue
Block a user