284 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			284 lines
		
	
	
		
			6.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2016 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | package api | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"bufio" | ||
|  | 	"fmt" | ||
|  | 	"io" | ||
|  | 	"net/http" | ||
|  | 	"os" | ||
|  | 	"path/filepath" | ||
|  | 	"sync" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/common" | ||
|  | 	"github.com/ethereum/go-ethereum/logger" | ||
|  | 	"github.com/ethereum/go-ethereum/logger/glog" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/storage" | ||
|  | ) | ||
|  | 
 | ||
|  | const maxParallelFiles = 5 | ||
|  | 
 | ||
|  | type FileSystem struct { | ||
|  | 	api *Api | ||
|  | } | ||
|  | 
 | ||
|  | func NewFileSystem(api *Api) *FileSystem { | ||
|  | 	return &FileSystem{api} | ||
|  | } | ||
|  | 
 | ||
|  | // Upload replicates a local directory as a manifest file and uploads it | ||
|  | // using dpa store | ||
|  | // TODO: localpath should point to a manifest | ||
|  | func (self *FileSystem) Upload(lpath, index string) (string, error) { | ||
|  | 	var list []*manifestTrieEntry | ||
|  | 	localpath, err := filepath.Abs(filepath.Clean(lpath)) | ||
|  | 	if err != nil { | ||
|  | 		return "", err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	f, err := os.Open(localpath) | ||
|  | 	if err != nil { | ||
|  | 		return "", err | ||
|  | 	} | ||
|  | 	stat, err := f.Stat() | ||
|  | 	if err != nil { | ||
|  | 		return "", err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	var start int | ||
|  | 	if stat.IsDir() { | ||
|  | 		start = len(localpath) | ||
|  | 		glog.V(logger.Debug).Infof("uploading '%s'", localpath) | ||
|  | 		err = filepath.Walk(localpath, func(path string, info os.FileInfo, err error) error { | ||
|  | 			if (err == nil) && !info.IsDir() { | ||
|  | 				//fmt.Printf("lp %s  path %s\n", localpath, path) | ||
|  | 				if len(path) <= start { | ||
|  | 					return fmt.Errorf("Path is too short") | ||
|  | 				} | ||
|  | 				if path[:start] != localpath { | ||
|  | 					return fmt.Errorf("Path prefix of '%s' does not match localpath '%s'", path, localpath) | ||
|  | 				} | ||
|  | 				entry := &manifestTrieEntry{ | ||
|  | 					Path: filepath.ToSlash(path), | ||
|  | 				} | ||
|  | 				list = append(list, entry) | ||
|  | 			} | ||
|  | 			return err | ||
|  | 		}) | ||
|  | 		if err != nil { | ||
|  | 			return "", err | ||
|  | 		} | ||
|  | 	} else { | ||
|  | 		dir := filepath.Dir(localpath) | ||
|  | 		start = len(dir) | ||
|  | 		if len(localpath) <= start { | ||
|  | 			return "", fmt.Errorf("Path is too short") | ||
|  | 		} | ||
|  | 		if localpath[:start] != dir { | ||
|  | 			return "", fmt.Errorf("Path prefix of '%s' does not match dir '%s'", localpath, dir) | ||
|  | 		} | ||
|  | 		entry := &manifestTrieEntry{ | ||
|  | 			Path: filepath.ToSlash(localpath), | ||
|  | 		} | ||
|  | 		list = append(list, entry) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	cnt := len(list) | ||
|  | 	errors := make([]error, cnt) | ||
|  | 	done := make(chan bool, maxParallelFiles) | ||
|  | 	dcnt := 0 | ||
|  | 	awg := &sync.WaitGroup{} | ||
|  | 
 | ||
|  | 	for i, entry := range list { | ||
|  | 		if i >= dcnt+maxParallelFiles { | ||
|  | 			<-done | ||
|  | 			dcnt++ | ||
|  | 		} | ||
|  | 		awg.Add(1) | ||
|  | 		go func(i int, entry *manifestTrieEntry, done chan bool) { | ||
|  | 			f, err := os.Open(entry.Path) | ||
|  | 			if err == nil { | ||
|  | 				stat, _ := f.Stat() | ||
|  | 				var hash storage.Key | ||
|  | 				wg := &sync.WaitGroup{} | ||
|  | 				hash, err = self.api.dpa.Store(f, stat.Size(), wg, nil) | ||
|  | 				if hash != nil { | ||
|  | 					list[i].Hash = hash.String() | ||
|  | 				} | ||
|  | 				wg.Wait() | ||
|  | 				awg.Done() | ||
|  | 				if err == nil { | ||
|  | 					first512 := make([]byte, 512) | ||
|  | 					fread, _ := f.ReadAt(first512, 0) | ||
|  | 					if fread > 0 { | ||
|  | 						mimeType := http.DetectContentType(first512[:fread]) | ||
|  | 						if filepath.Ext(entry.Path) == ".css" { | ||
|  | 							mimeType = "text/css" | ||
|  | 						} | ||
|  | 						list[i].ContentType = mimeType | ||
|  | 					} | ||
|  | 				} | ||
|  | 				f.Close() | ||
|  | 			} | ||
|  | 			errors[i] = err | ||
|  | 			done <- true | ||
|  | 		}(i, entry, done) | ||
|  | 	} | ||
|  | 	for dcnt < cnt { | ||
|  | 		<-done | ||
|  | 		dcnt++ | ||
|  | 	} | ||
|  | 
 | ||
|  | 	trie := &manifestTrie{ | ||
|  | 		dpa: self.api.dpa, | ||
|  | 	} | ||
|  | 	quitC := make(chan bool) | ||
|  | 	for i, entry := range list { | ||
|  | 		if errors[i] != nil { | ||
|  | 			return "", errors[i] | ||
|  | 		} | ||
|  | 		entry.Path = RegularSlashes(entry.Path[start:]) | ||
|  | 		if entry.Path == index { | ||
|  | 			ientry := &manifestTrieEntry{ | ||
|  | 				Path:        "", | ||
|  | 				Hash:        entry.Hash, | ||
|  | 				ContentType: entry.ContentType, | ||
|  | 			} | ||
|  | 			trie.addEntry(ientry, quitC) | ||
|  | 		} | ||
|  | 		trie.addEntry(entry, quitC) | ||
|  | 	} | ||
|  | 
 | ||
|  | 	err2 := trie.recalcAndStore() | ||
|  | 	var hs string | ||
|  | 	if err2 == nil { | ||
|  | 		hs = trie.hash.String() | ||
|  | 	} | ||
|  | 	awg.Wait() | ||
|  | 	return hs, err2 | ||
|  | } | ||
|  | 
 | ||
|  | // Download replicates the manifest path structure on the local filesystem | ||
|  | // under localpath | ||
|  | func (self *FileSystem) Download(bzzpath, localpath string) error { | ||
|  | 	lpath, err := filepath.Abs(filepath.Clean(localpath)) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 	err = os.MkdirAll(lpath, os.ModePerm) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	//resolving host and port | ||
|  | 	key, _, path, err := self.api.parseAndResolve(bzzpath, true) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	if len(path) > 0 { | ||
|  | 		path += "/" | ||
|  | 	} | ||
|  | 
 | ||
|  | 	quitC := make(chan bool) | ||
|  | 	trie, err := loadManifest(self.api.dpa, key, quitC) | ||
|  | 	if err != nil { | ||
|  | 		glog.V(logger.Warn).Infof("fs.Download: loadManifestTrie error: %v", err) | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	type downloadListEntry struct { | ||
|  | 		key  storage.Key | ||
|  | 		path string | ||
|  | 	} | ||
|  | 
 | ||
|  | 	var list []*downloadListEntry | ||
|  | 	var mde error | ||
|  | 
 | ||
|  | 	prevPath := lpath | ||
|  | 	err = trie.listWithPrefix(path, quitC, func(entry *manifestTrieEntry, suffix string) { | ||
|  | 		glog.V(logger.Detail).Infof("fs.Download: %#v", entry) | ||
|  | 
 | ||
|  | 		key = common.Hex2Bytes(entry.Hash) | ||
|  | 		path := lpath + "/" + suffix | ||
|  | 		dir := filepath.Dir(path) | ||
|  | 		if dir != prevPath { | ||
|  | 			mde = os.MkdirAll(dir, os.ModePerm) | ||
|  | 			prevPath = dir | ||
|  | 		} | ||
|  | 		if (mde == nil) && (path != dir+"/") { | ||
|  | 			list = append(list, &downloadListEntry{key: key, path: path}) | ||
|  | 		} | ||
|  | 	}) | ||
|  | 	if err != nil { | ||
|  | 		return err | ||
|  | 	} | ||
|  | 
 | ||
|  | 	wg := sync.WaitGroup{} | ||
|  | 	errC := make(chan error) | ||
|  | 	done := make(chan bool, maxParallelFiles) | ||
|  | 	for i, entry := range list { | ||
|  | 		select { | ||
|  | 		case done <- true: | ||
|  | 			wg.Add(1) | ||
|  | 		case <-quitC: | ||
|  | 			return fmt.Errorf("aborted") | ||
|  | 		} | ||
|  | 		go func(i int, entry *downloadListEntry) { | ||
|  | 			defer wg.Done() | ||
|  | 			f, err := os.Create(entry.path) // TODO: path separators | ||
|  | 			if err == nil { | ||
|  | 
 | ||
|  | 				reader := self.api.dpa.Retrieve(entry.key) | ||
|  | 				writer := bufio.NewWriter(f) | ||
|  | 				size, err := reader.Size(quitC) | ||
|  | 				if err == nil { | ||
|  | 					_, err = io.CopyN(writer, reader, size) // TODO: handle errors | ||
|  | 					err2 := writer.Flush() | ||
|  | 					if err == nil { | ||
|  | 						err = err2 | ||
|  | 					} | ||
|  | 					err2 = f.Close() | ||
|  | 					if err == nil { | ||
|  | 						err = err2 | ||
|  | 					} | ||
|  | 				} | ||
|  | 			} | ||
|  | 			if err != nil { | ||
|  | 				select { | ||
|  | 				case errC <- err: | ||
|  | 				case <-quitC: | ||
|  | 				} | ||
|  | 				return | ||
|  | 			} | ||
|  | 			<-done | ||
|  | 		}(i, entry) | ||
|  | 	} | ||
|  | 	go func() { | ||
|  | 		wg.Wait() | ||
|  | 		close(errC) | ||
|  | 	}() | ||
|  | 	select { | ||
|  | 	case err = <-errC: | ||
|  | 		return err | ||
|  | 	case <-quitC: | ||
|  | 		return fmt.Errorf("aborted") | ||
|  | 	} | ||
|  | 
 | ||
|  | } |