swarm/storage: Support for uploading 100gb files (#1395)

* swarm/storage: Fix gouroutine leak while uploading large files
* swarm/storage: Fix pyramid chunker to wrap properly level3 and above
This commit is contained in:
Zahoor Mohamed
2019-06-18 12:16:20 +05:30
committed by acud
parent 604960938b
commit f57d4f0802
5 changed files with 46 additions and 9 deletions

View File

@ -238,7 +238,7 @@ func TestRandomData(t *testing.T) {
// This test can validate files up to a relatively short length, as tree chunker slows down drastically. // This test can validate files up to a relatively short length, as tree chunker slows down drastically.
// Validation of longer files is done by TestLocalStoreAndRetrieve in swarm package. // Validation of longer files is done by TestLocalStoreAndRetrieve in swarm package.
//sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 524288, 524288 + 1, 524288 + 4097, 7 * 524288, 7*524288 + 1, 7*524288 + 4097} //sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 12287, 12288, 12289, 524288, 524288 + 1, 524288 + 4097, 7 * 524288, 7*524288 + 1, 7*524288 + 4097}
sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4097, 8191, 8192, 12288, 12289, 524288} sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4097, 8191, 8192, 12288, 12289, 524288, 2345678}
tester := &chunkerTester{t: t} tester := &chunkerTester{t: t}
for _, s := range sizes { for _, s := range sizes {

View File

@ -19,6 +19,7 @@ package storage
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"sync/atomic" "sync/atomic"
"github.com/ethersphere/swarm/chunk" "github.com/ethersphere/swarm/chunk"
@ -26,6 +27,11 @@ import (
"golang.org/x/crypto/sha3" "golang.org/x/crypto/sha3"
) )
const (
noOfStorageWorkers = 150 // Since we want 128 data chunks to be processed parallel + few for processing tree chunks
)
type hasherStore struct { type hasherStore struct {
// nrChunks is used with atomic functions // nrChunks is used with atomic functions
// it is required to be at the start of the struct to ensure 64bit alignment for ARM, x86-32, and 32-bit MIPS architectures // it is required to be at the start of the struct to ensure 64bit alignment for ARM, x86-32, and 32-bit MIPS architectures
@ -34,12 +40,15 @@ type hasherStore struct {
store ChunkStore store ChunkStore
tag *chunk.Tag tag *chunk.Tag
toEncrypt bool toEncrypt bool
doWait sync.Once
hashFunc SwarmHasher hashFunc SwarmHasher
hashSize int // content hash size hashSize int // content hash size
refSize int64 // reference size (content hash + possibly encryption key) refSize int64 // reference size (content hash + possibly encryption key)
errC chan error // global error channel errC chan error // global error channel
waitC chan error // global wait channel
doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks doneC chan struct{} // closed by Close() call to indicate that count is the final number of chunks
quitC chan struct{} // closed to quit unterminated routines quitC chan struct{} // closed to quit unterminated routines
workers chan Chunk // back pressure for limiting storage workers goroutines
} }
// NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces. // NewHasherStore creates a hasherStore object, which implements Putter and Getter interfaces.
@ -60,10 +69,11 @@ func NewHasherStore(store ChunkStore, hashFunc SwarmHasher, toEncrypt bool, tag
hashSize: hashSize, hashSize: hashSize,
refSize: refSize, refSize: refSize,
errC: make(chan error), errC: make(chan error),
waitC: make(chan error),
doneC: make(chan struct{}), doneC: make(chan struct{}),
quitC: make(chan struct{}), quitC: make(chan struct{}),
workers: make(chan Chunk, noOfStorageWorkers),
} }
return h return h
} }
@ -83,6 +93,11 @@ func (h *hasherStore) Put(ctx context.Context, chunkData ChunkData) (Reference,
chunk := h.createChunk(c) chunk := h.createChunk(c)
h.storeChunk(ctx, chunk) h.storeChunk(ctx, chunk)
// Start the wait function which will detect completion of put
h.doWait.Do(func() {
go h.startWait(ctx)
})
return Reference(append(chunk.Address(), encryptionKey...)), nil return Reference(append(chunk.Address(), encryptionKey...)), nil
} }
@ -121,8 +136,15 @@ func (h *hasherStore) Close() {
// Wait returns when // Wait returns when
// 1) the Close() function has been called and // 1) the Close() function has been called and
// 2) all the chunks which has been Put has been stored // 2) all the chunks which has been Put has been stored
// OR
// 1) if there is error while storing chunk
func (h *hasherStore) Wait(ctx context.Context) error { func (h *hasherStore) Wait(ctx context.Context) error {
defer close(h.quitC) defer close(h.quitC)
err := <-h.waitC
return err
}
func (h *hasherStore) startWait(ctx context.Context) {
var nrStoredChunks uint64 // number of stored chunks var nrStoredChunks uint64 // number of stored chunks
var done bool var done bool
doneC := h.doneC doneC := h.doneC
@ -130,7 +152,7 @@ func (h *hasherStore) Wait(ctx context.Context) error {
select { select {
// if context is done earlier, just return with the error // if context is done earlier, just return with the error
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() h.waitC <- ctx.Err()
// doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored // doneC is closed if all chunks have been submitted, from then we just wait until all of them are also stored
case <-doneC: case <-doneC:
done = true done = true
@ -138,14 +160,15 @@ func (h *hasherStore) Wait(ctx context.Context) error {
// a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter // a chunk has been stored, if err is nil, then successfully, so increase the stored chunk counter
case err := <-h.errC: case err := <-h.errC:
if err != nil { if err != nil {
return err h.waitC <- err
} }
nrStoredChunks++ nrStoredChunks++
} }
// if all the chunks have been submitted and all of them are stored, then we can return // if all the chunks have been submitted and all of them are stored, then we can return
if done { if done {
if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) { if nrStoredChunks >= atomic.LoadUint64(&h.nrChunks) {
return nil h.waitC <- nil
break
} }
} }
} }
@ -242,8 +265,12 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio
} }
func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) { func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
h.workers <- ch
atomic.AddUint64(&h.nrChunks, 1) atomic.AddUint64(&h.nrChunks, 1)
go func() { go func() {
defer func() {
<-h.workers
}()
seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch) seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
h.tag.Inc(chunk.StateStored) h.tag.Inc(chunk.StateStored)
if seen { if seen {

View File

@ -42,8 +42,9 @@ func TestHasherStore(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
chunkStore := NewMapChunkStore() chunkStore := NewMapChunkStore()
hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 0)) hasherStore := NewHasherStore(chunkStore, MakeHashFunc(DefaultHash), tt.toEncrypt, chunk.NewTag(0, "test-tag", 2))
// Put two random chunks into the hasherStore // Put two random chunks into the hasherStore
chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data() chunkData1 := GenerateRandomChunk(int64(tt.chunkLength)).Data()

View File

@ -134,7 +134,7 @@ func New(path string, baseKey []byte, o *Options) (db *DB, err error) {
if o == nil { if o == nil {
// default options // default options
o = &Options{ o = &Options{
Capacity: 5000000, Capacity: defaultCapacity,
} }
} }
db = &DB{ db = &DB{

View File

@ -204,7 +204,7 @@ func (pc *PyramidChunker) decrementWorkerCount() {
func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) { func (pc *PyramidChunker) Split(ctx context.Context) (k Address, wait func(context.Context) error, err error) {
pc.wg.Add(1) pc.wg.Add(1)
pc.prepareChunks(ctx, false) go pc.prepareChunks(ctx, false)
// closes internal error channel if all subprocesses in the workgroup finished // closes internal error channel if all subprocesses in the workgroup finished
go func() { go func() {
@ -239,7 +239,7 @@ func (pc *PyramidChunker) Append(ctx context.Context) (k Address, wait func(cont
pc.loadTree(ctx) pc.loadTree(ctx)
pc.wg.Add(1) pc.wg.Add(1)
pc.prepareChunks(ctx, true) go pc.prepareChunks(ctx, true)
// closes internal error channel if all subprocesses in the workgroup finished // closes internal error channel if all subprocesses in the workgroup finished
go func() { go func() {
@ -539,6 +539,15 @@ func (pc *PyramidChunker) buildTree(isAppend bool, ent *TreeEntry, chunkWG *sync
if lvlCount >= pc.branches { if lvlCount >= pc.branches {
endLvl = lvl + 1 endLvl = lvl + 1
compress = true compress = true
// Move up the chunk level to see if there is any boundary wrapping
for uprLvl := endLvl; uprLvl < pc.branches; uprLvl++ {
uprLvlCount := int64(len(pc.chunkLevel[uprLvl]))
if uprLvlCount >= pc.branches-1 {
endLvl = endLvl + 1
}
}
break break
} }
} }