From f57d4f0802324a6a65fdfda4203595307977e91f Mon Sep 17 00:00:00 2001 From: Zahoor Mohamed Date: Tue, 18 Jun 2019 12:16:20 +0530 Subject: [PATCH] swarm/storage: Support for uploading 100gb files (#1395) * swarm/storage: Fix gouroutine leak while uploading large files * swarm/storage: Fix pyramid chunker to wrap properly level3 and above --- storage/chunker_test.go | 2 +- storage/hasherstore.go | 35 ++++++++++++++++++++++++++++---- storage/hasherstore_test.go | 3 ++- storage/localstore/localstore.go | 2 +- storage/pyramid.go | 13 ++++++++++-- 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/storage/chunker_test.go b/storage/chunker_test.go index 9e85a917c6..7351c78341 100644 --- a/storage/chunker_test.go +++ b/storage/chunker_test.go @@ -238,7 +238,7 @@ func TestRandomData(t *testing.T) { // This test can validate files up to a relatively short length, as tree chunker slows down drastically. // Validation of longer files is done by TestLocalStoreAndRetrieve in swarm package. //sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 524288, 524288 + 1, 524288 + 4097, 7 * 524288, 7*524288 + 1, 7*524288 + 4097} - sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4097, 8191, 8192, 12288, 12289, 524288} + sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4097, 8191, 8192, 12288, 12289, 524288, 2345678} tester := &chunkerTester{t: t} for _, s := range sizes { diff --git a/storage/hasherstore.go b/storage/hasherstore.go index a6a4da6b30..0ac8cc408d 100644 --- a/storage/hasherstore.go +++ b/storage/hasherstore.go @@ -19,6 +19,7 @@ package storage import ( "context" "fmt" + "sync" "sync/atomic" "github.com/ethersphere/swarm/chunk" @@ -26,6 +27,11 @@ import ( "golang.org/x/crypto/sha3" ) +const ( + noOfStorageWorkers = 150 // Since we want 128 data chunks to be processed parallel + few for processing tree chunks + +) + type hasherStore struct { // nrChunks is used with atomic functions // it is required to be at the start of the struct to ensure 64bit alignment for ARM, x86-32, and 32-bit MIPS architectures @@ -34,12 +40,15 @@ type hasherStore struct { store ChunkStore tag *chunk.Tag toEncrypt bool + doWait sync.Once hashFunc SwarmHasher hashSize int // content hash size refSize int64 // reference size (content hash + possibly encryption key) errC chan error // global error channel + waitC chan error // global wait channel doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks quitC chan struct{} // closed to quit unterminated routines + workers chan Chunk // back pressure for limiting storage workers goroutines } // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. @@ -60,10 +69,11 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool, tag hashSize: hashSize, refSize: refSize, errC: make(chan error), + waitC: make(chan error), doneC: make(chan struct{}), quitC: make(chan struct{}), + workers: make(chan Chunk, noOfStorageWorkers), } - return h } @@ -83,6 +93,11 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference, chunk := h.createChunk(c) h.storeChunk(ctx, chunk) + // Start the wait function which will detect completion of put + h.doWait.Do(func() { + go h.startWait(ctx) + }) + return Reference(append(chunk.Address(), encryptionKey...)), nil } @@ -121,8 +136,15 @@ func (h *hasherStore) Close() { // Wait returns when // 1) the Close() function has been called and // 2) all the chunks which has been Put has been stored +// OR +// 1) if there is error while storing chunk func (h *hasherStore) Wait(ctx context.Context) error { defer close(h.quitC) + err := <-h.waitC + return err +} + +func (h *hasherStore) startWait(ctx context.Context) { var nrStoredChunks uint64 // number of stored chunks var done bool doneC := h.doneC @@ -130,7 +152,7 @@ func (h *hasherStore) Wait(ctx context.Context) error { select { // if context is done earlier, just return with the error case <-ctx.Done(): - return ctx.Err() + h.waitC <- ctx.Err() // doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored case <-doneC: done = true @@ -138,14 +160,15 @@ func (h *hasherStore) Wait(ctx context.Context) error { // a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter case err := <-h.errC: if err != nil { - return err + h.waitC <- err } nrStoredChunks++ } // if all the chunks have been submitted and all of them are stored, then we can return if done { if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) { - return nil + h.waitC <- nil + break } } } @@ -242,8 +265,12 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio } func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) { + h.workers <- ch atomic.AddUint64(&h.nrChunks, 1) go func() { + defer func() { + <-h.workers + }() seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch) h.tag.Inc(chunk.StateStored) if seen { diff --git a/storage/hasherstore_test.go b/storage/hasherstore_test.go index d8be9fbc6a..4b6347418f 100644 --- a/storage/hasherstore_test.go +++ b/storage/hasherstore_test.go @@ -42,8 +42,9 @@ func TestHasherStore(t *testing.T) { } for _, tt := range tests { + chunkStore := NewMapChunkStore() - hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 0)) + hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 2)) // Put two random chunks into the hasherStore chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() diff --git a/storage/localstore/localstore.go b/storage/localstore/localstore.go index d2a5b7cbff..265cd045ed 100644 --- a/storage/localstore/localstore.go +++ b/storage/localstore/localstore.go @@ -134,7 +134,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) { if o == nil { // default options o = &Options{ - Capacity: 5000000, + Capacity: defaultCapacity, } } db = &DB{ diff --git a/storage/pyramid.go b/storage/pyramid.go index 6ca7b0f64f..343a9f5a89 100644 --- a/storage/pyramid.go +++ b/storage/pyramid.go @@ -204,7 +204,7 @@ func (pc *PyramidChunker) decrementWorkerCount() { func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) { pc.wg.Add(1) - pc.prepareChunks(ctx, false) + go pc.prepareChunks(ctx, false) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -239,7 +239,7 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont pc.loadTree(ctx) pc.wg.Add(1) - pc.prepareChunks(ctx, true) + go pc.prepareChunks(ctx, true) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -539,6 +539,15 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync if lvlCount >= pc.branches { endLvl = lvl + 1 compress = true + + // Move up the chunk level to see if there is any boundary wrapping + for uprLvl := endLvl; uprLvl < pc.branches; uprLvl++ { + uprLvlCount := int64(len(pc.chunkLevel[uprLvl])) + if uprLvlCount >= pc.branches-1 { + endLvl = endLvl + 1 + } + } + break } }