* swarm: propagate ctx, enable opentracing * swarm/tracing: log error when tracing is misconfigured
		
			
				
	
	
		
			199 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			199 lines
		
	
	
		
			5.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2016 The go-ethereum Authors
 | |
| // This file is part of the go-ethereum library.
 | |
| //
 | |
| // The go-ethereum library is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Lesser General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // The go-ethereum library is distributed in the hope that it will be useful,
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | |
| // GNU Lesser General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Lesser General Public License
 | |
| // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package storage
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/ethereum/go-ethereum/swarm/log"
 | |
| 	"github.com/ethereum/go-ethereum/swarm/spancontext"
 | |
| 	opentracing "github.com/opentracing/opentracing-go"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// NetStore.Get timeout for get and get retries
 | |
| 	// This is the maximum period that the Get will block.
 | |
| 	// If it is reached, Get will return ErrChunkNotFound.
 | |
| 	netStoreRetryTimeout = 30 * time.Second
 | |
| 	// Minimal period between calling get method on NetStore
 | |
| 	// on retry. It protects calling get very frequently if
 | |
| 	// it returns ErrChunkNotFound very fast.
 | |
| 	netStoreMinRetryDelay = 3 * time.Second
 | |
| 	// Timeout interval before retrieval is timed out.
 | |
| 	// It is used in NetStore.get on waiting for ReqC to be
 | |
| 	// closed on a single retrieve request.
 | |
| 	searchTimeout = 10 * time.Second
 | |
| )
 | |
| 
 | |
| // NetStore implements the ChunkStore interface,
 | |
| // this chunk access layer assumed 2 chunk stores
 | |
| // local storage eg. LocalStore and network storage eg., NetStore
 | |
| // access by calling network is blocking with a timeout
 | |
| type NetStore struct {
 | |
| 	localStore *LocalStore
 | |
| 	retrieve   func(ctx context.Context, chunk *Chunk) error
 | |
| }
 | |
| 
 | |
| func NewNetStore(localStore *LocalStore, retrieve func(ctx context.Context, chunk *Chunk) error) *NetStore {
 | |
| 	return &NetStore{localStore, retrieve}
 | |
| }
 | |
| 
 | |
| // Get is the entrypoint for local retrieve requests
 | |
| // waits for response or times out
 | |
| //
 | |
| // Get uses get method to retrieve request, but retries if the
 | |
| // ErrChunkNotFound is returned by get, until the netStoreRetryTimeout
 | |
| // is reached.
 | |
| func (ns *NetStore) Get(ctx context.Context, addr Address) (chunk *Chunk, err error) {
 | |
| 
 | |
| 	var sp opentracing.Span
 | |
| 	ctx, sp = spancontext.StartSpan(
 | |
| 		ctx,
 | |
| 		"netstore.get.global")
 | |
| 	defer sp.Finish()
 | |
| 
 | |
| 	timer := time.NewTimer(netStoreRetryTimeout)
 | |
| 	defer timer.Stop()
 | |
| 
 | |
| 	// result and resultC provide results from the goroutine
 | |
| 	// where NetStore.get is called.
 | |
| 	type result struct {
 | |
| 		chunk *Chunk
 | |
| 		err   error
 | |
| 	}
 | |
| 	resultC := make(chan result)
 | |
| 
 | |
| 	// quitC ensures that retring goroutine is terminated
 | |
| 	// when this function returns.
 | |
| 	quitC := make(chan struct{})
 | |
| 	defer close(quitC)
 | |
| 
 | |
| 	// do retries in a goroutine so that the timer can
 | |
| 	// force this method to return after the netStoreRetryTimeout.
 | |
| 	go func() {
 | |
| 		// limiter ensures that NetStore.get is not called more frequently
 | |
| 		// then netStoreMinRetryDelay. If NetStore.get takes longer
 | |
| 		// then netStoreMinRetryDelay, the next retry call will be
 | |
| 		// without a delay.
 | |
| 		limiter := time.NewTimer(netStoreMinRetryDelay)
 | |
| 		defer limiter.Stop()
 | |
| 
 | |
| 		for {
 | |
| 			chunk, err := ns.get(ctx, addr, 0)
 | |
| 			if err != ErrChunkNotFound {
 | |
| 				// break retry only if the error is nil
 | |
| 				// or other error then ErrChunkNotFound
 | |
| 				select {
 | |
| 				case <-quitC:
 | |
| 					// Maybe NetStore.Get function has returned
 | |
| 					// by the timer.C while we were waiting for the
 | |
| 					// results. Terminate this goroutine.
 | |
| 				case resultC <- result{chunk: chunk, err: err}:
 | |
| 					// Send the result to the parrent goroutine.
 | |
| 				}
 | |
| 				return
 | |
| 
 | |
| 			}
 | |
| 			select {
 | |
| 			case <-quitC:
 | |
| 				// NetStore.Get function has returned, possibly
 | |
| 				// by the timer.C, which makes this goroutine
 | |
| 				// not needed.
 | |
| 				return
 | |
| 			case <-limiter.C:
 | |
| 			}
 | |
| 			// Reset the limiter for the next iteration.
 | |
| 			limiter.Reset(netStoreMinRetryDelay)
 | |
| 			log.Debug("NetStore.Get retry chunk", "key", addr)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	select {
 | |
| 	case r := <-resultC:
 | |
| 		return r.chunk, r.err
 | |
| 	case <-timer.C:
 | |
| 		return nil, ErrChunkNotFound
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetWithTimeout makes a single retrieval attempt for a chunk with a explicit timeout parameter
 | |
| func (ns *NetStore) GetWithTimeout(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
 | |
| 	return ns.get(ctx, addr, timeout)
 | |
| }
 | |
| 
 | |
| func (ns *NetStore) get(ctx context.Context, addr Address, timeout time.Duration) (chunk *Chunk, err error) {
 | |
| 	if timeout == 0 {
 | |
| 		timeout = searchTimeout
 | |
| 	}
 | |
| 
 | |
| 	var sp opentracing.Span
 | |
| 	ctx, sp = spancontext.StartSpan(
 | |
| 		ctx,
 | |
| 		"netstore.get")
 | |
| 	defer sp.Finish()
 | |
| 
 | |
| 	if ns.retrieve == nil {
 | |
| 		chunk, err = ns.localStore.Get(ctx, addr)
 | |
| 		if err == nil {
 | |
| 			return chunk, nil
 | |
| 		}
 | |
| 		if err != ErrFetching {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	} else {
 | |
| 		var created bool
 | |
| 		chunk, created = ns.localStore.GetOrCreateRequest(ctx, addr)
 | |
| 
 | |
| 		if chunk.ReqC == nil {
 | |
| 			return chunk, nil
 | |
| 		}
 | |
| 
 | |
| 		if created {
 | |
| 			err := ns.retrieve(ctx, chunk)
 | |
| 			if err != nil {
 | |
| 				// mark chunk request as failed so that we can retry it later
 | |
| 				chunk.SetErrored(ErrChunkUnavailable)
 | |
| 				return nil, err
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	t := time.NewTicker(timeout)
 | |
| 	defer t.Stop()
 | |
| 
 | |
| 	select {
 | |
| 	case <-t.C:
 | |
| 		// mark chunk request as failed so that we can retry
 | |
| 		chunk.SetErrored(ErrChunkNotFound)
 | |
| 		return nil, ErrChunkNotFound
 | |
| 	case <-chunk.ReqC:
 | |
| 	}
 | |
| 	chunk.SetErrored(nil)
 | |
| 	return chunk, nil
 | |
| }
 | |
| 
 | |
| // Put is the entrypoint for local store requests coming from storeLoop
 | |
| func (ns *NetStore) Put(ctx context.Context, chunk *Chunk) {
 | |
| 	ns.localStore.Put(ctx, chunk)
 | |
| }
 | |
| 
 | |
| // Close chunk store
 | |
| func (ns *NetStore) Close() {
 | |
| 	ns.localStore.Close()
 | |
| }
 |