* cmd/swarm: minor cli flag text adjustments * swarm/api/http: sticky footer for swarm landing page using flex * swarm/api/http: sticky footer for error pages and fix for multiple choices * cmd/swarm, swarm/storage, swarm: fix mingw on windows test issues * cmd/swarm: update description of swarm cmd * swarm: added network ID test * cmd/swarm: support for smoke tests on the production swarm cluster * cmd/swarm/swarm-smoke: simplify cluster logic as per suggestion * swarm: propagate ctx to internal apis (#754) * swarm/metrics: collect disk measurements * swarm/bmt: fix io.Writer interface * Write now tolerates arbitrary variable buffers * added variable buffer tests * Write loop and finalise optimisation * refactor / rename * add tests for empty input * swarm/pss: (UPDATE) Generic notifications package (#744) swarm/pss: Generic package for creating pss notification svcs * swarm: Adding context to more functions * swarm/api: change colour of landing page in templates * swarm/api: change landing page to react to enter keypress
		
			
				
	
	
		
			292 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			292 lines
		
	
	
		
			6.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2016 The go-ethereum Authors
 | 
						|
// This file is part of the go-ethereum library.
 | 
						|
//
 | 
						|
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Lesser General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package api
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"os"
 | 
						|
	"path"
 | 
						|
	"path/filepath"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/common"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/log"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
						|
)
 | 
						|
 | 
						|
const maxParallelFiles = 5
 | 
						|
 | 
						|
type FileSystem struct {
 | 
						|
	api *API
 | 
						|
}
 | 
						|
 | 
						|
func NewFileSystem(api *API) *FileSystem {
 | 
						|
	return &FileSystem{api}
 | 
						|
}
 | 
						|
 | 
						|
// Upload replicates a local directory as a manifest file and uploads it
 | 
						|
// using FileStore store
 | 
						|
// This function waits the chunks to be stored.
 | 
						|
// TODO: localpath should point to a manifest
 | 
						|
//
 | 
						|
// DEPRECATED: Use the HTTP API instead
 | 
						|
func (fs *FileSystem) Upload(lpath, index string, toEncrypt bool) (string, error) {
 | 
						|
	var list []*manifestTrieEntry
 | 
						|
	localpath, err := filepath.Abs(filepath.Clean(lpath))
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	f, err := os.Open(localpath)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	stat, err := f.Stat()
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
 | 
						|
	var start int
 | 
						|
	if stat.IsDir() {
 | 
						|
		start = len(localpath)
 | 
						|
		log.Debug(fmt.Sprintf("uploading '%s'", localpath))
 | 
						|
		err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error {
 | 
						|
			if (err == nil) && !info.IsDir() {
 | 
						|
				if len(path) <= start {
 | 
						|
					return fmt.Errorf("Path is too short")
 | 
						|
				}
 | 
						|
				if path[:start] != localpath {
 | 
						|
					return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath)
 | 
						|
				}
 | 
						|
				entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(path)}, nil)
 | 
						|
				list = append(list, entry)
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		})
 | 
						|
		if err != nil {
 | 
						|
			return "", err
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		dir := filepath.Dir(localpath)
 | 
						|
		start = len(dir)
 | 
						|
		if len(localpath) <= start {
 | 
						|
			return "", fmt.Errorf("Path is too short")
 | 
						|
		}
 | 
						|
		if localpath[:start] != dir {
 | 
						|
			return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir)
 | 
						|
		}
 | 
						|
		entry := newManifestTrieEntry(&ManifestEntry{Path: filepath.ToSlash(localpath)}, nil)
 | 
						|
		list = append(list, entry)
 | 
						|
	}
 | 
						|
 | 
						|
	cnt := len(list)
 | 
						|
	errors := make([]error, cnt)
 | 
						|
	done := make(chan bool, maxParallelFiles)
 | 
						|
	dcnt := 0
 | 
						|
	awg := &sync.WaitGroup{}
 | 
						|
 | 
						|
	for i, entry := range list {
 | 
						|
		if i >= dcnt+maxParallelFiles {
 | 
						|
			<-done
 | 
						|
			dcnt++
 | 
						|
		}
 | 
						|
		awg.Add(1)
 | 
						|
		go func(i int, entry *manifestTrieEntry, done chan bool) {
 | 
						|
			f, err := os.Open(entry.Path)
 | 
						|
			if err == nil {
 | 
						|
				stat, _ := f.Stat()
 | 
						|
				var hash storage.Address
 | 
						|
				var wait func(context.Context) error
 | 
						|
				ctx := context.TODO()
 | 
						|
				hash, wait, err = fs.api.fileStore.Store(ctx, f, stat.Size(), toEncrypt)
 | 
						|
				if hash != nil {
 | 
						|
					list[i].Hash = hash.Hex()
 | 
						|
				}
 | 
						|
				err = wait(ctx)
 | 
						|
				awg.Done()
 | 
						|
				if err == nil {
 | 
						|
					first512 := make([]byte, 512)
 | 
						|
					fread, _ := f.ReadAt(first512, 0)
 | 
						|
					if fread > 0 {
 | 
						|
						mimeType := http.DetectContentType(first512[:fread])
 | 
						|
						if filepath.Ext(entry.Path) == ".css" {
 | 
						|
							mimeType = "text/css"
 | 
						|
						}
 | 
						|
						list[i].ContentType = mimeType
 | 
						|
					}
 | 
						|
				}
 | 
						|
				f.Close()
 | 
						|
			}
 | 
						|
			errors[i] = err
 | 
						|
			done <- true
 | 
						|
		}(i, entry, done)
 | 
						|
	}
 | 
						|
	for dcnt < cnt {
 | 
						|
		<-done
 | 
						|
		dcnt++
 | 
						|
	}
 | 
						|
 | 
						|
	trie := &manifestTrie{
 | 
						|
		fileStore: fs.api.fileStore,
 | 
						|
	}
 | 
						|
	quitC := make(chan bool)
 | 
						|
	for i, entry := range list {
 | 
						|
		if errors[i] != nil {
 | 
						|
			return "", errors[i]
 | 
						|
		}
 | 
						|
		entry.Path = RegularSlashes(entry.Path[start:])
 | 
						|
		if entry.Path == index {
 | 
						|
			ientry := newManifestTrieEntry(&ManifestEntry{
 | 
						|
				ContentType: entry.ContentType,
 | 
						|
			}, nil)
 | 
						|
			ientry.Hash = entry.Hash
 | 
						|
			trie.addEntry(ientry, quitC)
 | 
						|
		}
 | 
						|
		trie.addEntry(entry, quitC)
 | 
						|
	}
 | 
						|
 | 
						|
	err2 := trie.recalcAndStore()
 | 
						|
	var hs string
 | 
						|
	if err2 == nil {
 | 
						|
		hs = trie.ref.Hex()
 | 
						|
	}
 | 
						|
	awg.Wait()
 | 
						|
	return hs, err2
 | 
						|
}
 | 
						|
 | 
						|
// Download replicates the manifest basePath structure on the local filesystem
 | 
						|
// under localpath
 | 
						|
//
 | 
						|
// DEPRECATED: Use the HTTP API instead
 | 
						|
func (fs *FileSystem) Download(bzzpath, localpath string) error {
 | 
						|
	lpath, err := filepath.Abs(filepath.Clean(localpath))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	err = os.MkdirAll(lpath, os.ModePerm)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	//resolving host and port
 | 
						|
	uri, err := Parse(path.Join("bzz:/", bzzpath))
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	addr, err := fs.api.Resolve(context.TODO(), uri)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	path := uri.Path
 | 
						|
 | 
						|
	if len(path) > 0 {
 | 
						|
		path += "/"
 | 
						|
	}
 | 
						|
 | 
						|
	quitC := make(chan bool)
 | 
						|
	trie, err := loadManifest(context.TODO(), fs.api.fileStore, addr, quitC)
 | 
						|
	if err != nil {
 | 
						|
		log.Warn(fmt.Sprintf("fs.Download: loadManifestTrie error: %v", err))
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	type downloadListEntry struct {
 | 
						|
		addr storage.Address
 | 
						|
		path string
 | 
						|
	}
 | 
						|
 | 
						|
	var list []*downloadListEntry
 | 
						|
	var mde error
 | 
						|
 | 
						|
	prevPath := lpath
 | 
						|
	err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) {
 | 
						|
		log.Trace(fmt.Sprintf("fs.Download: %#v", entry))
 | 
						|
 | 
						|
		addr = common.Hex2Bytes(entry.Hash)
 | 
						|
		path := lpath + "/" + suffix
 | 
						|
		dir := filepath.Dir(path)
 | 
						|
		if dir != prevPath {
 | 
						|
			mde = os.MkdirAll(dir, os.ModePerm)
 | 
						|
			prevPath = dir
 | 
						|
		}
 | 
						|
		if (mde == nil) && (path != dir+"/") {
 | 
						|
			list = append(list, &downloadListEntry{addr: addr, path: path})
 | 
						|
		}
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	wg := sync.WaitGroup{}
 | 
						|
	errC := make(chan error)
 | 
						|
	done := make(chan bool, maxParallelFiles)
 | 
						|
	for i, entry := range list {
 | 
						|
		select {
 | 
						|
		case done <- true:
 | 
						|
			wg.Add(1)
 | 
						|
		case <-quitC:
 | 
						|
			return fmt.Errorf("aborted")
 | 
						|
		}
 | 
						|
		go func(i int, entry *downloadListEntry) {
 | 
						|
			defer wg.Done()
 | 
						|
			err := retrieveToFile(quitC, fs.api.fileStore, entry.addr, entry.path)
 | 
						|
			if err != nil {
 | 
						|
				select {
 | 
						|
				case errC <- err:
 | 
						|
				case <-quitC:
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
			<-done
 | 
						|
		}(i, entry)
 | 
						|
	}
 | 
						|
	go func() {
 | 
						|
		wg.Wait()
 | 
						|
		close(errC)
 | 
						|
	}()
 | 
						|
	select {
 | 
						|
	case err = <-errC:
 | 
						|
		return err
 | 
						|
	case <-quitC:
 | 
						|
		return fmt.Errorf("aborted")
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func retrieveToFile(quitC chan bool, fileStore *storage.FileStore, addr storage.Address, path string) error {
 | 
						|
	f, err := os.Create(path) // TODO: basePath separators
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	reader, _ := fileStore.Retrieve(context.TODO(), addr)
 | 
						|
	writer := bufio.NewWriter(f)
 | 
						|
	size, err := reader.Size(quitC)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if _, err = io.CopyN(writer, reader, size); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if err := writer.Flush(); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return f.Close()
 | 
						|
}
 |