298 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
		
		
			
		
	
	
			298 lines
		
	
	
		
			8.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
|   | // Copyright 2018 The go-ethereum Authors | ||
|  | // This file is part of the go-ethereum library. | ||
|  | // | ||
|  | // The go-ethereum library is free software: you can redistribute it and/or modify | ||
|  | // it under the terms of the GNU Lesser General Public License as published by | ||
|  | // the Free Software Foundation, either version 3 of the License, or | ||
|  | // (at your option) any later version. | ||
|  | // | ||
|  | // The go-ethereum library is distributed in the hope that it will be useful, | ||
|  | // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
|  | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
|  | // GNU Lesser General Public License for more details. | ||
|  | // | ||
|  | // You should have received a copy of the GNU Lesser General Public License | ||
|  | // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. | ||
|  | 
 | ||
|  | package stream | ||
|  | 
 | ||
|  | import ( | ||
|  | 	"math" | ||
|  | 	"strconv" | ||
|  | 	"time" | ||
|  | 
 | ||
|  | 	"github.com/ethereum/go-ethereum/metrics" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/log" | ||
|  | 	"github.com/ethereum/go-ethereum/swarm/storage" | ||
|  | ) | ||
|  | 
 | ||
|  | const ( | ||
|  | 	// BatchSize = 2 | ||
|  | 	BatchSize = 128 | ||
|  | ) | ||
|  | 
 | ||
|  | // SwarmSyncerServer implements an Server for history syncing on bins | ||
|  | // offered streams: | ||
|  | // * live request delivery with or without checkback | ||
|  | // * (live/non-live historical) chunk syncing per proximity bin | ||
|  | type SwarmSyncerServer struct { | ||
|  | 	po        uint8 | ||
|  | 	db        *storage.DBAPI | ||
|  | 	sessionAt uint64 | ||
|  | 	start     uint64 | ||
|  | 	quit      chan struct{} | ||
|  | } | ||
|  | 
 | ||
|  | // NewSwarmSyncerServer is contructor for SwarmSyncerServer | ||
|  | func NewSwarmSyncerServer(live bool, po uint8, db *storage.DBAPI) (*SwarmSyncerServer, error) { | ||
|  | 	sessionAt := db.CurrentBucketStorageIndex(po) | ||
|  | 	var start uint64 | ||
|  | 	if live { | ||
|  | 		start = sessionAt | ||
|  | 	} | ||
|  | 	return &SwarmSyncerServer{ | ||
|  | 		po:        po, | ||
|  | 		db:        db, | ||
|  | 		sessionAt: sessionAt, | ||
|  | 		start:     start, | ||
|  | 		quit:      make(chan struct{}), | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | func RegisterSwarmSyncerServer(streamer *Registry, db *storage.DBAPI) { | ||
|  | 	streamer.RegisterServerFunc("SYNC", func(p *Peer, t string, live bool) (Server, error) { | ||
|  | 		po, err := ParseSyncBinKey(t) | ||
|  | 		if err != nil { | ||
|  | 			return nil, err | ||
|  | 		} | ||
|  | 		return NewSwarmSyncerServer(live, po, db) | ||
|  | 	}) | ||
|  | 	// streamer.RegisterServerFunc(stream, func(p *Peer) (Server, error) { | ||
|  | 	// 	return NewOutgoingProvableSwarmSyncer(po, db) | ||
|  | 	// }) | ||
|  | } | ||
|  | 
 | ||
|  | // Close needs to be called on a stream server | ||
|  | func (s *SwarmSyncerServer) Close() { | ||
|  | 	close(s.quit) | ||
|  | } | ||
|  | 
 | ||
|  | // GetSection retrieves the actual chunk from localstore | ||
|  | func (s *SwarmSyncerServer) GetData(key []byte) ([]byte, error) { | ||
|  | 	chunk, err := s.db.Get(storage.Address(key)) | ||
|  | 	if err == storage.ErrFetching { | ||
|  | 		<-chunk.ReqC | ||
|  | 	} else if err != nil { | ||
|  | 		return nil, err | ||
|  | 	} | ||
|  | 	return chunk.SData, nil | ||
|  | } | ||
|  | 
 | ||
|  | // GetBatch retrieves the next batch of hashes from the dbstore | ||
|  | func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { | ||
|  | 	var batch []byte | ||
|  | 	i := 0 | ||
|  | 	if from == 0 { | ||
|  | 		from = s.start | ||
|  | 	} | ||
|  | 	if to <= from || from >= s.sessionAt { | ||
|  | 		to = math.MaxUint64 | ||
|  | 	} | ||
|  | 	var ticker *time.Ticker | ||
|  | 	defer func() { | ||
|  | 		if ticker != nil { | ||
|  | 			ticker.Stop() | ||
|  | 		} | ||
|  | 	}() | ||
|  | 	var wait bool | ||
|  | 	for { | ||
|  | 		if wait { | ||
|  | 			if ticker == nil { | ||
|  | 				ticker = time.NewTicker(1000 * time.Millisecond) | ||
|  | 			} | ||
|  | 			select { | ||
|  | 			case <-ticker.C: | ||
|  | 			case <-s.quit: | ||
|  | 				return nil, 0, 0, nil, nil | ||
|  | 			} | ||
|  | 		} | ||
|  | 
 | ||
|  | 		metrics.GetOrRegisterCounter("syncer.setnextbatch.iterator", nil).Inc(1) | ||
|  | 		err := s.db.Iterator(from, to, s.po, func(addr storage.Address, idx uint64) bool { | ||
|  | 			batch = append(batch, addr[:]...) | ||
|  | 			i++ | ||
|  | 			to = idx | ||
|  | 			return i < BatchSize | ||
|  | 		}) | ||
|  | 		if err != nil { | ||
|  | 			return nil, 0, 0, nil, err | ||
|  | 		} | ||
|  | 		if len(batch) > 0 { | ||
|  | 			break | ||
|  | 		} | ||
|  | 		wait = true | ||
|  | 	} | ||
|  | 
 | ||
|  | 	log.Trace("Swarm syncer offer batch", "po", s.po, "len", i, "from", from, "to", to, "current store count", s.db.CurrentBucketStorageIndex(s.po)) | ||
|  | 	return batch, from, to, nil, nil | ||
|  | } | ||
|  | 
 | ||
|  | // SwarmSyncerClient | ||
|  | type SwarmSyncerClient struct { | ||
|  | 	sessionAt     uint64 | ||
|  | 	nextC         chan struct{} | ||
|  | 	sessionRoot   storage.Address | ||
|  | 	sessionReader storage.LazySectionReader | ||
|  | 	retrieveC     chan *storage.Chunk | ||
|  | 	storeC        chan *storage.Chunk | ||
|  | 	db            *storage.DBAPI | ||
|  | 	// chunker               storage.Chunker | ||
|  | 	currentRoot           storage.Address | ||
|  | 	requestFunc           func(chunk *storage.Chunk) | ||
|  | 	end, start            uint64 | ||
|  | 	peer                  *Peer | ||
|  | 	ignoreExistingRequest bool | ||
|  | 	stream                Stream | ||
|  | } | ||
|  | 
 | ||
|  | // NewSwarmSyncerClient is a contructor for provable data exchange syncer | ||
|  | func NewSwarmSyncerClient(p *Peer, db *storage.DBAPI, ignoreExistingRequest bool, stream Stream) (*SwarmSyncerClient, error) { | ||
|  | 	return &SwarmSyncerClient{ | ||
|  | 		db:   db, | ||
|  | 		peer: p, | ||
|  | 		ignoreExistingRequest: ignoreExistingRequest, | ||
|  | 		stream:                stream, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | // // NewIncomingProvableSwarmSyncer is a contructor for provable data exchange syncer | ||
|  | // func NewIncomingProvableSwarmSyncer(po int, priority int, index uint64, sessionAt uint64, intervals []uint64, sessionRoot storage.Key, chunker *storage.PyramidChunker, store storage.ChunkStore, p Peer) *SwarmSyncerClient { | ||
|  | // 	retrieveC := make(storage.Chunk, chunksCap) | ||
|  | // 	RunChunkRequestor(p, retrieveC) | ||
|  | // 	storeC := make(storage.Chunk, chunksCap) | ||
|  | // 	RunChunkStorer(store, storeC) | ||
|  | // 	s := &SwarmSyncerClient{ | ||
|  | // 		po:            po, | ||
|  | // 		priority:      priority, | ||
|  | // 		sessionAt:     sessionAt, | ||
|  | // 		start:         index, | ||
|  | // 		end:           index, | ||
|  | // 		nextC:         make(chan struct{}, 1), | ||
|  | // 		intervals:     intervals, | ||
|  | // 		sessionRoot:   sessionRoot, | ||
|  | // 		sessionReader: chunker.Join(sessionRoot, retrieveC), | ||
|  | // 		retrieveC:     retrieveC, | ||
|  | // 		storeC:        storeC, | ||
|  | // 	} | ||
|  | // 	return s | ||
|  | // } | ||
|  | 
 | ||
|  | // // StartSyncing is called on the Peer to start the syncing process | ||
|  | // // the idea is that it is called only after kademlia is close to healthy | ||
|  | // func StartSyncing(s *Streamer, peerId discover.NodeID, po uint8, nn bool) { | ||
|  | // 	lastPO := po | ||
|  | // 	if nn { | ||
|  | // 		lastPO = maxPO | ||
|  | // 	} | ||
|  | // | ||
|  | // 	for i := po; i <= lastPO; i++ { | ||
|  | // 		s.Subscribe(peerId, "SYNC", newSyncLabel("LIVE", po), 0, 0, High, true) | ||
|  | // 		s.Subscribe(peerId, "SYNC", newSyncLabel("HISTORY", po), 0, 0, Mid, false) | ||
|  | // 	} | ||
|  | // } | ||
|  | 
 | ||
|  | // RegisterSwarmSyncerClient registers the client constructor function for | ||
|  | // to handle incoming sync streams | ||
|  | func RegisterSwarmSyncerClient(streamer *Registry, db *storage.DBAPI) { | ||
|  | 	streamer.RegisterClientFunc("SYNC", func(p *Peer, t string, live bool) (Client, error) { | ||
|  | 		return NewSwarmSyncerClient(p, db, true, NewStream("SYNC", t, live)) | ||
|  | 	}) | ||
|  | } | ||
|  | 
 | ||
|  | // NeedData | ||
|  | func (s *SwarmSyncerClient) NeedData(key []byte) (wait func()) { | ||
|  | 	chunk, _ := s.db.GetOrCreateRequest(key) | ||
|  | 	// TODO: we may want to request from this peer anyway even if the request exists | ||
|  | 
 | ||
|  | 	// ignoreExistingRequest is temporary commented out until its functionality is verified. | ||
|  | 	// For now, this optimization can be disabled. | ||
|  | 	if chunk.ReqC == nil { //|| (s.ignoreExistingRequest && !created) { | ||
|  | 		return nil | ||
|  | 	} | ||
|  | 	// create request and wait until the chunk data arrives and is stored | ||
|  | 	return func() { | ||
|  | 		chunk.WaitToStore() | ||
|  | 	} | ||
|  | } | ||
|  | 
 | ||
|  | // BatchDone | ||
|  | func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) { | ||
|  | 	// TODO: reenable this with putter/getter refactored code | ||
|  | 	// if s.chunker != nil { | ||
|  | 	// 	return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) } | ||
|  | 	// } | ||
|  | 	return nil | ||
|  | } | ||
|  | 
 | ||
|  | func (s *SwarmSyncerClient) TakeoverProof(stream Stream, from uint64, hashes []byte, root storage.Address) (*TakeoverProof, error) { | ||
|  | 	// for provable syncer currentRoot is non-zero length | ||
|  | 	// TODO: reenable this with putter/getter | ||
|  | 	// if s.chunker != nil { | ||
|  | 	// 	if from > s.sessionAt { // for live syncing currentRoot is always updated | ||
|  | 	// 		//expRoot, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC, s.storeC) | ||
|  | 	// 		expRoot, _, err := s.chunker.Append(s.currentRoot, bytes.NewReader(hashes), s.retrieveC) | ||
|  | 	// 		if err != nil { | ||
|  | 	// 			return nil, err | ||
|  | 	// 		} | ||
|  | 	// 		if !bytes.Equal(root, expRoot) { | ||
|  | 	// 			return nil, fmt.Errorf("HandoverProof mismatch") | ||
|  | 	// 		} | ||
|  | 	// 		s.currentRoot = root | ||
|  | 	// 	} else { | ||
|  | 	// 		expHashes := make([]byte, len(hashes)) | ||
|  | 	// 		_, err := s.sessionReader.ReadAt(expHashes, int64(s.end*HashSize)) | ||
|  | 	// 		if err != nil && err != io.EOF { | ||
|  | 	// 			return nil, err | ||
|  | 	// 		} | ||
|  | 	// 		if !bytes.Equal(expHashes, hashes) { | ||
|  | 	// 			return nil, errors.New("invalid proof") | ||
|  | 	// 		} | ||
|  | 	// 	} | ||
|  | 	// 	return nil, nil | ||
|  | 	// } | ||
|  | 	s.end += uint64(len(hashes)) / HashSize | ||
|  | 	takeover := &Takeover{ | ||
|  | 		Stream: stream, | ||
|  | 		Start:  s.start, | ||
|  | 		End:    s.end, | ||
|  | 		Root:   root, | ||
|  | 	} | ||
|  | 	// serialise and sign | ||
|  | 	return &TakeoverProof{ | ||
|  | 		Takeover: takeover, | ||
|  | 		Sig:      nil, | ||
|  | 	}, nil | ||
|  | } | ||
|  | 
 | ||
|  | func (s *SwarmSyncerClient) Close() {} | ||
|  | 
 | ||
|  | // base for parsing and formating sync bin key | ||
|  | // it must be 2 <= base <= 36 | ||
|  | const syncBinKeyBase = 36 | ||
|  | 
 | ||
|  | // FormatSyncBinKey returns a string representation of | ||
|  | // Kademlia bin number to be used as key for SYNC stream. | ||
|  | func FormatSyncBinKey(bin uint8) string { | ||
|  | 	return strconv.FormatUint(uint64(bin), syncBinKeyBase) | ||
|  | } | ||
|  | 
 | ||
|  | // ParseSyncBinKey parses the string representation | ||
|  | // and returns the Kademlia bin number. | ||
|  | func ParseSyncBinKey(s string) (uint8, error) { | ||
|  | 	bin, err := strconv.ParseUint(s, syncBinKeyBase, 8) | ||
|  | 	if err != nil { | ||
|  | 		return 0, err | ||
|  | 	} | ||
|  | 	return uint8(bin), nil | ||
|  | } |