swarm/storage: fix chunker when reader is broken
* brokenLimitedReader gives error after half size * TestRandomBrokenData tests chunker with broken reader * add blocking quitC (instead of errC) and use errC only for errors * don't close chunkC in tester Split, * use quitC to quit chunk storage loop
This commit is contained in:
@ -23,8 +23,6 @@ import (
|
||||
"hash"
|
||||
"io"
|
||||
"sync"
|
||||
// "github.com/ethereum/go-ethereum/logger"
|
||||
// "github.com/ethereum/go-ethereum/logger/glog"
|
||||
)
|
||||
|
||||
/*
|
||||
@ -124,12 +122,13 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
|
||||
jobC := make(chan *hashJob, 2*processors)
|
||||
wg := &sync.WaitGroup{}
|
||||
errC := make(chan error)
|
||||
quitC := make(chan bool)
|
||||
|
||||
// wwg = workers waitgroup keeps track of hashworkers spawned by this split call
|
||||
if wwg != nil {
|
||||
wwg.Add(1)
|
||||
}
|
||||
go self.hashWorker(jobC, chunkC, errC, swg, wwg)
|
||||
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
|
||||
|
||||
depth := 0
|
||||
treeSize := self.chunkSize
|
||||
@ -141,11 +140,10 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
|
||||
}
|
||||
|
||||
key := make([]byte, self.hashFunc().Size())
|
||||
// glog.V(logger.Detail).Infof("split request received for data (%v bytes, depth: %v)", size, depth)
|
||||
// this waitgroup member is released after the root hash is calculated
|
||||
wg.Add(1)
|
||||
//launch actual recursive function passing the waitgroups
|
||||
go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, wg, swg, wwg)
|
||||
go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg)
|
||||
|
||||
// closes internal error channel if all subprocesses in the workgroup finished
|
||||
go func() {
|
||||
@ -153,7 +151,6 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
|
||||
wg.Wait()
|
||||
// if storage waitgroup is non-nil, we wait for storage to finish too
|
||||
if swg != nil {
|
||||
// glog.V(logger.Detail).Infof("Waiting for storage to finish")
|
||||
swg.Wait()
|
||||
}
|
||||
close(errC)
|
||||
@ -162,14 +159,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s
|
||||
select {
|
||||
case err := <-errC:
|
||||
if err != nil {
|
||||
close(quitC)
|
||||
return nil, err
|
||||
}
|
||||
//
|
||||
//TODO: add a timeout
|
||||
}
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, parentWg, swg, wwg *sync.WaitGroup) {
|
||||
func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) {
|
||||
|
||||
for depth > 0 && size < treeSize {
|
||||
treeSize /= self.branches
|
||||
@ -180,17 +178,20 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
|
||||
// leaf nodes -> content chunks
|
||||
chunkData := make([]byte, size+8)
|
||||
binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size))
|
||||
data.Read(chunkData[8:])
|
||||
_, err := data.Read(chunkData[8:])
|
||||
if err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
select {
|
||||
case jobC <- &hashJob{key, chunkData, size, parentWg}:
|
||||
case <-errC:
|
||||
case <-quitC:
|
||||
}
|
||||
// glog.V(logger.Detail).Infof("read %v", size)
|
||||
return
|
||||
}
|
||||
// dept > 0
|
||||
// intermediate chunk containing child nodes hashes
|
||||
branchCnt := int64((size + treeSize - 1) / treeSize)
|
||||
// glog.V(logger.Detail).Infof("intermediate node: setting branches: %v, depth: %v, max subtree size: %v, data size: %v", branches, depth, treeSize, size)
|
||||
|
||||
var chunk []byte = make([]byte, branchCnt*self.hashSize+8)
|
||||
var pos, i int64
|
||||
@ -210,7 +211,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
|
||||
subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize]
|
||||
|
||||
childrenWg.Add(1)
|
||||
self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, childrenWg, swg, wwg)
|
||||
self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg)
|
||||
|
||||
i++
|
||||
pos += treeSize
|
||||
@ -224,15 +225,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade
|
||||
wwg.Add(1)
|
||||
}
|
||||
self.workerCount++
|
||||
go self.hashWorker(jobC, chunkC, errC, swg, wwg)
|
||||
go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg)
|
||||
}
|
||||
select {
|
||||
case jobC <- &hashJob{key, chunk, size, parentWg}:
|
||||
case <-errC:
|
||||
case <-quitC:
|
||||
}
|
||||
}
|
||||
|
||||
func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, swg, wwg *sync.WaitGroup) {
|
||||
func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) {
|
||||
hasher := self.hashFunc()
|
||||
if wwg != nil {
|
||||
defer wwg.Done()
|
||||
@ -247,8 +248,7 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC
|
||||
// now we got the hashes in the chunk, then hash the chunks
|
||||
hasher.Reset()
|
||||
self.hashChunk(hasher, job, chunkC, swg)
|
||||
// glog.V(logger.Detail).Infof("hash chunk (%v)", job.size)
|
||||
case <-errC:
|
||||
case <-quitC:
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -276,6 +276,7 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan *
|
||||
}
|
||||
}
|
||||
job.parentWg.Done()
|
||||
|
||||
if chunkC != nil {
|
||||
chunkC <- newChunk
|
||||
}
|
||||
@ -328,7 +329,6 @@ func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) {
|
||||
func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
|
||||
// this is correct, a swarm doc cannot be zero length, so no EOF is expected
|
||||
if len(b) == 0 {
|
||||
// glog.V(logger.Detail).Infof("Size query for %v", chunk.Key.Log())
|
||||
return 0, nil
|
||||
}
|
||||
quitC := make(chan bool)
|
||||
@ -336,13 +336,10 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size)
|
||||
|
||||
errC := make(chan error)
|
||||
// glog.V(logger.Detail).Infof("readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off)
|
||||
|
||||
// }
|
||||
// glog.V(logger.Detail).Infof("-> want: %v, off: %v size: %v ", want, off, self.size)
|
||||
var treeSize int64
|
||||
var depth int
|
||||
// calculate depth and max treeSize
|
||||
@ -364,22 +361,16 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) {
|
||||
|
||||
return 0, err
|
||||
}
|
||||
// glog.V(logger.Detail).Infof("ReadAt received %v", err)
|
||||
// glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size)
|
||||
if off+int64(len(b)) >= size {
|
||||
// glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b))
|
||||
return len(b), io.EOF
|
||||
}
|
||||
// glog.V(logger.Detail).Infof("ReadAt returning at %d: %v", read, err)
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) {
|
||||
defer parentWg.Done()
|
||||
// return NewDPA(&LocalStore{})
|
||||
// glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff)
|
||||
|
||||
// glog.V(logger.Detail).Infof("depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize)
|
||||
|
||||
// chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
|
||||
|
||||
@ -391,7 +382,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
|
||||
|
||||
// leaf chunk found
|
||||
if depth == 0 {
|
||||
// glog.V(logger.Detail).Infof("depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize)
|
||||
extra := 8 + eoff - int64(len(chunk.SData))
|
||||
if extra > 0 {
|
||||
eoff -= extra
|
||||
@ -406,7 +396,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
defer wg.Wait()
|
||||
// glog.V(logger.Detail).Infof("start %v,end %v", start, end)
|
||||
|
||||
for i := start; i < end; i++ {
|
||||
soff := i * treeSize
|
||||
@ -425,7 +414,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr
|
||||
wg.Add(1)
|
||||
go func(j int64) {
|
||||
childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize]
|
||||
// glog.V(logger.Detail).Infof("subtree ind.ex: %v -> %v", j, childKey.Log())
|
||||
chunk := retrieve(childKey, self.chunkC, quitC)
|
||||
if chunk == nil {
|
||||
select {
|
||||
@ -450,7 +438,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
|
||||
Key: key,
|
||||
C: make(chan bool), // close channel to signal data delivery
|
||||
}
|
||||
// glog.V(logger.Detail).Infof("chunk data sent for %v (key interval in chunk %v-%v)", ch.Key.Log(), j*self.chunker.hashSize, (j+1)*self.chunker.hashSize)
|
||||
// submit chunk for retrieval
|
||||
select {
|
||||
case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally)
|
||||
@ -464,7 +451,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
|
||||
// this is how we control process leakage (quitC is closed once join is finished (after timeout))
|
||||
return nil
|
||||
case <-chunk.C: // bells are ringing, data have been delivered
|
||||
// glog.V(logger.Detail).Infof("chunk data received")
|
||||
}
|
||||
if len(chunk.SData) == 0 {
|
||||
return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8]))
|
||||
@ -476,7 +462,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk {
|
||||
// Read keeps a cursor so cannot be called simulateously, see ReadAt
|
||||
func (self *LazyChunkReader) Read(b []byte) (read int, err error) {
|
||||
read, err = self.ReadAt(b, self.off)
|
||||
// glog.V(logger.Detail).Infof("read: %v, off: %v, error: %v", read, self.off, err)
|
||||
|
||||
self.off += int64(read)
|
||||
return
|
||||
|
Reference in New Issue
Block a user